Delay operator emit early fix #28269

* Bugfix for delay stage with EmitEarly #28269

Emit early would make it pull eagerly even if out was not available,
when element arrived and buffer was full and out not available that
element would be left in the input port and never end up in the buffer.

* Additional cleanup of the Delay stage
This commit is contained in:
Johan Andrén 2019-11-29 14:46:51 +01:00 committed by GitHub
parent bfabdebace
commit 583fe3e2a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 24 deletions

View file

@ -88,7 +88,7 @@ private[akka] object OverflowStrategies {
* INTERNAL API * INTERNAL API
*/ */
private[akka] case object EmitEarly extends DelayOverflowStrategy { private[akka] case object EmitEarly extends DelayOverflowStrategy {
private[akka] override def isBackpressure: Boolean = false private[akka] override def isBackpressure: Boolean = true
} }
} }
@ -132,6 +132,7 @@ object DelayOverflowStrategy {
/** /**
* If the buffer is full when a new element is available this strategy send next element downstream without waiting * If the buffer is full when a new element is available this strategy send next element downstream without waiting
* Will backpressure if downstream is not ready.
*/ */
def emitEarly: DelayOverflowStrategy = EmitEarly def emitEarly: DelayOverflowStrategy = EmitEarly

View file

@ -1700,37 +1700,46 @@ private[stream] object Collect {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class Delay[T]( @InternalApi object Delay {
private[this] val delayStrategySupplier: () => DelayStrategy[_ >: T], private val TimerName = "DelayedTimer"
private[this] val overflowStrategy: DelayOverflowStrategy) private val DelayPrecisionMS = 10
extends SimpleLinearGraphStage[T] { }
private[this] def timerName = "DelayedTimer"
private[this] val DelayPrecisionMS = 10 /**
* INTERNAL API
*/
@InternalApi private[akka] final class Delay[T](
delayStrategySupplier: () => DelayStrategy[T],
overflowStrategy: DelayOverflowStrategy)
extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.delay override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler { new TimerGraphStageLogic(shape) with InHandler with OutHandler {
import Delay._
private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
private[this] val delayStrategy = delayStrategySupplier() private[this] val delayStrategy = delayStrategySupplier()
private[this] var buffer // buffer has pairs of timestamp of expected push and element
: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp of expected push with upstream element private[this] val buffer = BufferImpl[(Long, T)](size, inheritedAttributes)
override def preStart(): Unit = buffer = BufferImpl(size, inheritedAttributes)
private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match { private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match {
case EmitEarly => case EmitEarly =>
() => { () => {
if (isAvailable(out)) { if (isAvailable(out)) {
if (isTimerActive(timerName)) if (isTimerActive(TimerName)) {
cancelTimer(timerName) cancelTimer(TimerName)
}
push(out, buffer.dequeue()._2) push(out, buffer.dequeue()._2)
grabAndPull() grabAndPull()
completeIfReady() completeIfReady()
} else {
throw new IllegalStateException(
"Was configured to emitEarly and got element when out is not ready and buffer is full, should not be possible.")
} }
} }
case _: DropHead => case _: DropHead =>
@ -1746,7 +1755,7 @@ private[stream] object Collect {
case _: DropNew => case _: DropNew =>
() => { () => {
grab(in) grab(in)
if (pullCondition) pull(in) if (shouldPull) pull(in)
} }
case _: DropBuffer => case _: DropBuffer =>
() => { () => {
@ -1768,39 +1777,41 @@ private[stream] object Collect {
onPushWhenBufferFull() onPushWhenBufferFull()
else { else {
grabAndPull() grabAndPull()
if (!isTimerActive(timerName)) { if (!isTimerActive(TimerName)) {
val waitTime = nextElementWaitTime() val waitTime = nextElementWaitTime()
if (waitTime <= DelayPrecisionMS && isAvailable(out)) { if (waitTime <= DelayPrecisionMS && isAvailable(out)) {
push(out, buffer.dequeue()._2) push(out, buffer.dequeue()._2)
completeIfReady() completeIfReady()
} else } else
scheduleOnce(timerName, waitTime.millis) scheduleOnce(TimerName, waitTime.millis)
} }
} }
} }
private def pullCondition: Boolean = private def shouldPull: Boolean =
!overflowStrategy.isBackpressure || buffer.used < size buffer.used < size || !overflowStrategy.isBackpressure ||
// we can only emit early if output is ready
(overflowStrategy == EmitEarly && isAvailable(out))
private def grabAndPull(): Unit = { private def grabAndPull(): Unit = {
val element = grab(in) val element = grab(in)
buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element)) buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element))
if (pullCondition) pull(in) if (shouldPull) pull(in)
} }
override def onUpstreamFinish(): Unit = override def onUpstreamFinish(): Unit =
completeIfReady() completeIfReady()
def onPull(): Unit = { def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty) { if (!isTimerActive(TimerName) && !buffer.isEmpty) {
val waitTime = nextElementWaitTime() val waitTime = nextElementWaitTime()
if (waitTime <= DelayPrecisionMS) if (waitTime <= DelayPrecisionMS)
push(out, buffer.dequeue()._2) push(out, buffer.dequeue()._2)
else else
scheduleOnce(timerName, waitTime.millis) scheduleOnce(TimerName, waitTime.millis)
} }
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition) if (!isClosed(in) && !hasBeenPulled(in) && shouldPull)
pull(in) pull(in)
completeIfReady() completeIfReady()

View file

@ -1600,8 +1600,10 @@ trait FlowOps[+Out, +Mat] {
* @param of time to shift all messages * @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] = def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] = {
via(new Delay[Out](() => DelayStrategy.fixedDelay(of), strategy)) val fixedDelay = DelayStrategy.fixedDelay(of)
via(new Delay[Out](() => fixedDelay, strategy))
}
/** /**
* Shifts elements emission in time by an amount individually determined through delay strategy a specified amount. * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.