+str #18556 add delay combinator

This commit is contained in:
Alexander Golubev 2015-11-27 15:46:35 -05:00
parent 83d3143236
commit d5cae10a67
3 changed files with 20 additions and 11 deletions

View file

@ -861,8 +861,8 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val buffer = FixedSizeBuffer[(Long, T)](maxBuffer) // buffer has pairs timestamp with upstream element
val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element
val timerName = "DelayedTimer"
var willStop = false
@ -878,28 +878,28 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg
}
case DelayOverflowStrategy.DropHead
buffer.dropHead()
grabElementAndSchedule(true)
grabAndPull(true)
case DelayOverflowStrategy.DropTail
buffer.dropTail()
grabElementAndSchedule(true)
grabAndPull(true)
case DelayOverflowStrategy.DropNew
grab(in)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
case DelayOverflowStrategy.DropBuffer
buffer.clear()
grabElementAndSchedule(true)
grabAndPull(true)
case DelayOverflowStrategy.Fail
failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $maxBuffer)!"))
case DelayOverflowStrategy.Backpressure //do nothing here
failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
case DelayOverflowStrategy.Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
else {
grabElementAndSchedule(strategy != DelayOverflowStrategy.Backpressure)
grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
}
}
def grabElementAndSchedule(pullCondition: Boolean): Unit = {
def grabAndPull(pullCondition: Boolean): Unit = {
buffer.enqueue((System.currentTimeMillis(), grab(in)))
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
if (pullCondition) pull(in)
}