Merge pull request #19807 from akka/wip-FixedSizeBuffer-wrap-RK

Wip fixed size buffer wrap rk
This commit is contained in:
Roland Kuhn 2016-02-16 21:31:21 +01:00
commit fd800fe1ac
3 changed files with 55 additions and 37 deletions

View file

@ -89,7 +89,24 @@ class FixedBufferSpec extends AkkaSpec {
buf.isFull should be(true)
for (elem 1 to size) buf.dequeue() should be(elem)
}
}
"work when indexes wrap around at Int.MaxValue" in {
import language.reflectiveCalls
val buf = FixedSizeBuffer[Int](size)
val cheat = buf.asInstanceOf[{ def readIdx_=(l: Long): Unit; def writeIdx_=(l: Long): Unit }]
cheat.readIdx_=(Int.MaxValue)
cheat.writeIdx_=(Int.MaxValue)
for (_ 1 to 10) {
buf.isEmpty should be(true)
buf.isFull should be(false)
for (elem 1 to size) buf.enqueue(elem)
buf.isEmpty should be(false)
buf.isFull should be(true)
for (elem 1 to size) buf.dequeue() should be(elem)
}
}
}

View file

@ -68,23 +68,24 @@ private[akka] object FixedSizeBuffer {
override def toString = s"Buffer($capacity, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
private val buffer = new Array[AnyRef](capacity)
protected var readIdx = 0
protected var writeIdx = 0
def used: Int = writeIdx - readIdx
protected var readIdx = 0L
protected var writeIdx = 0L
def used: Int = (writeIdx - readIdx).toInt
def isFull: Boolean = used == capacity
def isEmpty: Boolean = used == 0
def nonEmpty: Boolean = used != 0
def enqueue(elem: T): Unit = {
put(writeIdx, elem)
put(writeIdx, elem, false)
writeIdx += 1
}
protected def toOffset(idx: Int): Int
// for the maintenance parameter see dropHead
protected def toOffset(idx: Long, maintenance: Boolean): Int
def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef]
def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
private def put(idx: Long, elem: T, maintenance: Boolean): Unit = buffer(toOffset(idx, maintenance)) = elem.asInstanceOf[AnyRef]
private def get(idx: Long): T = buffer(toOffset(idx, false)).asInstanceOf[T]
def peek(): T = get(readIdx)
@ -101,23 +102,39 @@ private[akka] object FixedSizeBuffer {
}
def dropHead(): Unit = {
put(readIdx, null.asInstanceOf[T])
/*
* this is the only place where readIdx is advanced, so give ModuloFixedSizeBuffer
* a chance to prevent its fatal wrap-around
*/
put(readIdx, null.asInstanceOf[T], true)
readIdx += 1
}
def dropTail(): Unit = {
writeIdx -= 1
put(writeIdx, null.asInstanceOf[T])
put(writeIdx, null.asInstanceOf[T], false)
}
}
private[akka] final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
override protected def toOffset(idx: Int): Int = idx % capacity
override protected def toOffset(idx: Long, maintenance: Boolean): Int = {
if (maintenance && readIdx > Int.MaxValue) {
/*
* In order to be able to run perpetually we must ensure that the counters
* dont overrun into negative territory, so set them back by as many multiples
* of the capacity as possible when both are above Int.MaxValue.
*/
val shift = Int.MaxValue - (Int.MaxValue % capacity)
readIdx -= shift
writeIdx -= shift
}
(idx % capacity).toInt
}
}
private[akka] final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
private val Mask = capacity - 1
override protected def toOffset(idx: Int): Int = idx & Mask
override protected def toOffset(idx: Long, maintenance: Boolean): Int = idx.toInt & Mask
}
}

View file

@ -52,25 +52,16 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var initialized = false
private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts)
private var pendingHead = 0
private var pendingTail = 0
private val pendingQueue = FixedSizeBuffer[Inlet[T]](inputPorts)
private def pending: Boolean = pendingQueue.nonEmpty
private var runningUpstreams = inputPorts
private def upstreamsClosed = runningUpstreams == 0
private def pending: Boolean = pendingHead != pendingTail
override def preStart(): Unit = in.foreach(tryPull)
private def enqueue(in: Inlet[T]): Unit = {
pendingQueue(pendingTail % inputPorts) = in
pendingTail += 1
}
private def dequeueAndDispatch(): Unit = {
val in = pendingQueue(pendingHead % inputPorts)
pendingHead += 1
val in = pendingQueue.dequeue()
push(out, grab(in))
if (upstreamsClosed && !pending) completeStage()
else tryPull(in)
@ -84,7 +75,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e
push(out, grab(i))
tryPull(i)
}
} else enqueue(i)
} else pendingQueue.enqueue(i)
}
override def onUpstreamFinish() =
@ -611,21 +602,14 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val pendingQueue = Array.ofDim[Outlet[T]](outputPorts)
private var pendingHead: Int = 0
private var pendingTail: Int = 0
private val pendingQueue = FixedSizeBuffer[Outlet[T]](outputPorts)
private def noPending: Boolean = pendingQueue.isEmpty
private var needDownstreamPulls: Int = if (waitForAllDownstreams) outputPorts else 0
private var downstreamsRunning: Int = outputPorts
private def noPending: Boolean = pendingHead == pendingTail
private def enqueue(out: Outlet[T]): Unit = {
pendingQueue(pendingTail % outputPorts) = out
pendingTail += 1
}
private def dequeueAndDispatch(): Unit = {
val out = pendingQueue(pendingHead % outputPorts)
pendingHead += 1
val out = pendingQueue.dequeue()
push(out, grab(in))
if (!noPending) pull(in)
}
@ -651,9 +635,9 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext
}
} else {
if (!hasBeenPulled(in)) pull(in)
enqueue(o)
pendingQueue.enqueue(o)
}
} else enqueue(o)
} else pendingQueue.enqueue(o)
}
override def onDownstreamFinish() = {
@ -1009,7 +993,7 @@ object GraphDSL extends GraphApply {
}
private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_])
extends PortOps[Out] {
extends PortOps[Out] {
override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported
override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported