Flow.delayWith allows custom delay for each element. (#25000)

This commit is contained in:
Jean-Baptiste Giraudeau 2019-11-27 17:30:56 +01:00 committed by Johan Andrén
parent db141d0373
commit 6d893fb571
12 changed files with 500 additions and 44 deletions

View file

@ -15,7 +15,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl }
import akka.stream.scaladsl.{ Flow, Keep, Source }
import akka.stream.scaladsl.{ DelayStrategy, Flow, Keep, Source }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
@ -1700,34 +1700,38 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy)
@InternalApi private[akka] final class Delay[T](
private[this] val delayStrategySupplier: () => DelayStrategy[_ >: T],
private[this] val overflowStrategy: DelayOverflowStrategy)
extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
final val DelayPrecisionMS = 10
private[this] val DelayPrecisionMS = 10
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
val delayMillis = d.toMillis
private[this] val delayStrategy = delayStrategySupplier()
var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
private[this] var buffer
: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp of expected push with upstream element
override def preStart(): Unit = buffer = BufferImpl(size, inheritedAttributes)
val onPushWhenBufferFull: () => Unit = strategy match {
private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match {
case EmitEarly =>
() => {
if (!isTimerActive(timerName))
if (isAvailable(out)) {
if (isTimerActive(timerName))
cancelTimer(timerName)
push(out, buffer.dequeue()._2)
else {
cancelTimer(timerName)
onTimer(timerName)
grabAndPull()
completeIfReady()
}
grabAndPull()
}
case _: DropHead =>
() => {
@ -1742,7 +1746,7 @@ private[stream] object Collect {
case _: DropNew =>
() => {
grab(in)
pull(in)
if (pullCondition) pull(in)
}
case _: DropBuffer =>
() => {
@ -1751,7 +1755,7 @@ private[stream] object Collect {
}
case _: Fail =>
() => {
failStage(BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!"))
failStage(new BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!"))
}
case _: Backpressure =>
() => {
@ -1765,28 +1769,22 @@ private[stream] object Collect {
else {
grabAndPull()
if (!isTimerActive(timerName)) {
// schedule a timer for the full-delay `d` only if the buffer is empty, because otherwise a
// full-length timer will starve subsequent `onPull` callbacks, preventing overdue elements
// to be discharged.
if (buffer.isEmpty)
scheduleOnce(timerName, d)
else
scheduleOnce(timerName, Math.max(DelayPrecisionMS, nextElementWaitTime()).millis)
val waitTime = nextElementWaitTime()
if (waitTime <= DelayPrecisionMS && isAvailable(out)) {
push(out, buffer.dequeue()._2)
completeIfReady()
} else
scheduleOnce(timerName, waitTime.millis)
}
}
}
def pullCondition: Boolean = strategy match {
case EmitEarly =>
// when buffer is full we can only emit early if out is available
buffer.used < size || isAvailable(out)
case _ =>
!strategy.isBackpressure || buffer.used < size
}
private def pullCondition: Boolean =
!overflowStrategy.isBackpressure || buffer.used < size
def grabAndPull(): Unit = {
if (buffer.used == size) throw new IllegalStateException("Trying to enqueue but buffer is full")
buffer.enqueue((System.nanoTime(), grab(in)))
private def grabAndPull(): Unit = {
val element = grab(in)
buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element))
if (pullCondition) pull(in)
}
@ -1796,11 +1794,10 @@ private[stream] object Collect {
def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime < 0) {
if (waitTime <= DelayPrecisionMS)
push(out, buffer.dequeue()._2)
} else {
scheduleOnce(timerName, Math.max(DelayPrecisionMS, waitTime).millis)
}
else
scheduleOnce(timerName, waitTime.millis)
}
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition)
@ -1814,19 +1811,14 @@ private[stream] object Collect {
def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()
def nextElementWaitTime(): Long = {
delayMillis - NANOSECONDS.toMillis(System.nanoTime() - buffer.peek()._1)
private def nextElementWaitTime(): Long = {
NANOSECONDS.toMillis(buffer.peek()._1 - System.nanoTime())
}
final override protected def onTimer(key: Any): Unit = {
if (isAvailable(out))
push(out, buffer.dequeue()._2)
if (!buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime > DelayPrecisionMS)
scheduleOnce(timerName, waitTime.millis)
}
completeIfReady()
}
}
@ -2266,7 +2258,6 @@ private[stream] object Collect {
matVal
}
}
(stageLogic, matPromise.future)
}