Delay should not pull when buffer is full with Backpressure strategy, #21334

Additionally
* nano time conversion and calculation fix
* refactoring of Delay.onPush
This commit is contained in:
Patrik Nordwall 2016-09-13 16:10:49 +02:00 committed by Johan Andrén
parent 86c15c04e3
commit 79d8ec87fc
2 changed files with 68 additions and 27 deletions

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
@ -1310,74 +1311,101 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy)
case None throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
case Some(InputBuffer(min, max)) max
}
val delayMillis = d.toMillis
var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
var willStop = false
override def preStart(): Unit = buffer = BufferImpl(size, materializer)
//FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full
def onPush(): Unit = {
if (buffer.isFull) strategy match {
case EmitEarly
val onPushWhenBufferFull: () Unit = strategy match {
case EmitEarly
() {
if (!isTimerActive(timerName))
push(out, buffer.dequeue()._2)
else {
cancelTimer(timerName)
onTimer(timerName)
}
case DropHead
}
case DropHead
() {
buffer.dropHead()
grabAndPull(true)
case DropTail
grabAndPull()
}
case DropTail
() {
buffer.dropTail()
grabAndPull(true)
case DropNew
grabAndPull()
}
case DropNew
() {
grab(in)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
case DropBuffer
}
case DropBuffer
() {
buffer.clear()
grabAndPull(true)
case Fail
grabAndPull()
}
case Fail
() {
failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
}
case Backpressure
() {
throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
}
def onPush(): Unit = {
if (buffer.isFull)
onPushWhenBufferFull()
else {
grabAndPull(strategy != Backpressure || buffer.used < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
grabAndPull()
if (!isTimerActive(timerName)) {
scheduleOnce(timerName, d)
}
}
}
def grabAndPull(pullCondition: Boolean): Unit = {
def pullCondition: Boolean =
strategy != Backpressure || buffer.used < size
def grabAndPull(): Unit = {
buffer.enqueue((System.nanoTime(), grab(in)))
if (pullCondition) pull(in)
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
else completeStage()
}
override def onUpstreamFinish(): Unit =
completeIfReady()
def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0)
push(out, buffer.dequeue()._2)
if (!willStop && !hasBeenPulled(in)) pull(in)
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition)
pull(in)
completeIfReady()
}
setHandler(in, this)
setHandler(out, this)
def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage()
def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()
def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000
def nextElementWaitTime(): Long = {
delayMillis - NANOSECONDS.toMillis(System.nanoTime() - buffer.peek()._1)
}
final override protected def onTimer(key: Any): Unit = {
push(out, buffer.dequeue()._2)
if (isAvailable(out))
push(out, buffer.dequeue()._2)
if (!buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime > 10) scheduleOnce(timerName, waitTime.millis)
if (waitTime > 10)
scheduleOnce(timerName, waitTime.millis)
}
completeIfReady()
}