From bfe8d3bb3192df6440f7bc2be20a5b348637067b Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 16 Feb 2016 19:59:52 +0100 Subject: [PATCH 1/3] fix FixedSizeBuffer wrap-around --- .../akka/stream/impl/FixedBufferSpec.scala | 17 ++++++++ .../main/scala/akka/stream/impl/Buffers.scala | 39 +++++++++++++------ 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala index a6ce59e338..bf94ccbbfa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala @@ -89,7 +89,24 @@ class FixedBufferSpec extends AkkaSpec { buf.isFull should be(true) for (elem ← 1 to size) buf.dequeue() should be(elem) } + } + "work when indexes wrap around at Int.MaxValue" in { + import language.reflectiveCalls + val buf = FixedSizeBuffer[Int](size) + + val cheat = buf.asInstanceOf[{ def readIdx_=(l: Long): Unit; def writeIdx_=(l: Long): Unit }] + cheat.readIdx_=(Int.MaxValue) + cheat.writeIdx_=(Int.MaxValue) + + for (_ ← 1 to 10) { + buf.isEmpty should be(true) + buf.isFull should be(false) + for (elem ← 1 to size) buf.enqueue(elem) + buf.isEmpty should be(false) + buf.isFull should be(true) + for (elem ← 1 to size) buf.dequeue() should be(elem) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala index 184e0bf7b2..863bafc5d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala @@ -68,23 +68,24 @@ private[akka] object FixedSizeBuffer { override def toString = s"Buffer($capacity, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})" private val buffer = new Array[AnyRef](capacity) - protected var readIdx = 0 - protected var writeIdx = 0 - def used: Int = writeIdx - readIdx + protected var readIdx = 0L + protected var writeIdx = 0L + def used: Int = (writeIdx - readIdx).toInt def isFull: Boolean = used == capacity def isEmpty: Boolean = used == 0 def nonEmpty: Boolean = used != 0 def enqueue(elem: T): Unit = { - put(writeIdx, elem) + put(writeIdx, elem, false) writeIdx += 1 } - protected def toOffset(idx: Int): Int + // for the maintenance parameter see dropHead + protected def toOffset(idx: Long, maintenance: Boolean): Int - def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef] - def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T] + private def put(idx: Long, elem: T, maintenance: Boolean): Unit = buffer(toOffset(idx, maintenance)) = elem.asInstanceOf[AnyRef] + private def get(idx: Long): T = buffer(toOffset(idx, false)).asInstanceOf[T] def peek(): T = get(readIdx) @@ -101,23 +102,39 @@ private[akka] object FixedSizeBuffer { } def dropHead(): Unit = { - put(readIdx, null.asInstanceOf[T]) + /* + * this is the only place where readIdx is advanced, so give ModuloFixedSizeBuffer + * a chance to prevent its fatal wrap-around + */ + put(readIdx, null.asInstanceOf[T], true) readIdx += 1 } def dropTail(): Unit = { writeIdx -= 1 - put(writeIdx, null.asInstanceOf[T]) + put(writeIdx, null.asInstanceOf[T], false) } } private[akka] final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) { - override protected def toOffset(idx: Int): Int = idx % capacity + override protected def toOffset(idx: Long, maintenance: Boolean): Int = { + if (maintenance && readIdx > Int.MaxValue) { + /* + * In order to be able to run perpetually we must ensure that the counters + * don’t overrun into negative territory, so set them back by as many multiples + * of the capacity as possible when both are above Int.MaxValue. + */ + val shift = Int.MaxValue - (Int.MaxValue % capacity) + readIdx -= shift + writeIdx -= shift + } + (idx % capacity).toInt + } } private[akka] final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) { private val Mask = capacity - 1 - override protected def toOffset(idx: Int): Int = idx & Mask + override protected def toOffset(idx: Long, maintenance: Boolean): Int = idx.toInt & Mask } } From a1e7e87f38e1727edc8e55397d6cec7d4af21441 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 16 Feb 2016 20:17:24 +0100 Subject: [PATCH 2/3] fix Merge internal buffer wrap-around, fixes #19806 --- .../scala/akka/stream/scaladsl/Graph.scala | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e8086ac2c0..9270e591d5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -52,25 +52,16 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private var initialized = false - private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts) - private var pendingHead = 0 - private var pendingTail = 0 + private val pendingQueue = FixedSizeBuffer[Inlet[T]](inputPorts) + private def pending: Boolean = pendingQueue.nonEmpty private var runningUpstreams = inputPorts private def upstreamsClosed = runningUpstreams == 0 - private def pending: Boolean = pendingHead != pendingTail - override def preStart(): Unit = in.foreach(tryPull) - private def enqueue(in: Inlet[T]): Unit = { - pendingQueue(pendingTail % inputPorts) = in - pendingTail += 1 - } - private def dequeueAndDispatch(): Unit = { - val in = pendingQueue(pendingHead % inputPorts) - pendingHead += 1 + val in = pendingQueue.dequeue() push(out, grab(in)) if (upstreamsClosed && !pending) completeStage() else tryPull(in) @@ -84,7 +75,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e push(out, grab(i)) tryPull(i) } - } else enqueue(i) + } else pendingQueue.enqueue(i) } override def onUpstreamFinish() = @@ -1009,7 +1000,7 @@ object GraphDSL extends GraphApply { } private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) - extends PortOps[Out] { + extends PortOps[Out] { override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported From 03d0bce08ae23ae82ea258ee93f849e177c7c348 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 16 Feb 2016 20:34:20 +0100 Subject: [PATCH 3/3] fix Balance internal buffer wrap-around, see #19806 --- .../main/scala/akka/stream/scaladsl/Graph.scala | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 9270e591d5..f4c9fd5ab2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -602,21 +602,14 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - private val pendingQueue = Array.ofDim[Outlet[T]](outputPorts) - private var pendingHead: Int = 0 - private var pendingTail: Int = 0 + private val pendingQueue = FixedSizeBuffer[Outlet[T]](outputPorts) + private def noPending: Boolean = pendingQueue.isEmpty private var needDownstreamPulls: Int = if (waitForAllDownstreams) outputPorts else 0 private var downstreamsRunning: Int = outputPorts - private def noPending: Boolean = pendingHead == pendingTail - private def enqueue(out: Outlet[T]): Unit = { - pendingQueue(pendingTail % outputPorts) = out - pendingTail += 1 - } private def dequeueAndDispatch(): Unit = { - val out = pendingQueue(pendingHead % outputPorts) - pendingHead += 1 + val out = pendingQueue.dequeue() push(out, grab(in)) if (!noPending) pull(in) } @@ -642,9 +635,9 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext } } else { if (!hasBeenPulled(in)) pull(in) - enqueue(o) + pendingQueue.enqueue(o) } - } else enqueue(o) + } else pendingQueue.enqueue(o) } override def onDownstreamFinish() = {