Schedule timer for remaining delayed elements #22416

This commit is contained in:
Thomas Alton 2017-03-30 09:27:01 -05:00 committed by Johan Andrén
parent 2714dca5e1
commit 626d07edca
2 changed files with 31 additions and 3 deletions

View file

@ -67,6 +67,26 @@ class FlowDelaySpec extends StreamSpec {
c.expectComplete() c.expectComplete()
} }
"deliver delayed elements that arrive within the same timeout as preceding group of elements" taggedAs TimingTest in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]()
val p = TestPublisher.manualProbe[Int]()
Source.fromPublisher(p).delay(300.millis).to(Sink.fromSubscriber(c)).run()
val cSub = c.expectSubscription()
val pSub = p.expectSubscription()
cSub.request(100)
pSub.sendNext(1)
pSub.sendNext(2)
c.expectNoMsg(200.millis)
pSub.sendNext(3)
c.expectNext(1)
c.expectNext(2)
c.expectNoMsg(150.millis)
c.expectNext(3)
pSub.sendComplete()
c.expectComplete()
}
"drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped { "drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped {
Await.result( Await.result(
Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16)) Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16))

View file

@ -1486,6 +1486,8 @@ private[stream] object Collect {
@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { @InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer" private[this] def timerName = "DelayedTimer"
final val DelayPrecisionMS = 10
override def initialAttributes: Attributes = DefaultAttributes.delay override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
@ -1563,8 +1565,14 @@ private[stream] object Collect {
completeIfReady() completeIfReady()
def onPull(): Unit = { def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) if (!isTimerActive(timerName) && !buffer.isEmpty) {
push(out, buffer.dequeue()._2) val waitTime = nextElementWaitTime()
if (waitTime < 0) {
push(out, buffer.dequeue()._2)
} else {
scheduleOnce(timerName, Math.max(DelayPrecisionMS, waitTime).millis)
}
}
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition) if (!isClosed(in) && !hasBeenPulled(in) && pullCondition)
pull(in) pull(in)
@ -1587,7 +1595,7 @@ private[stream] object Collect {
if (!buffer.isEmpty) { if (!buffer.isEmpty) {
val waitTime = nextElementWaitTime() val waitTime = nextElementWaitTime()
if (waitTime > 10) if (waitTime > DelayPrecisionMS)
scheduleOnce(timerName, waitTime.millis) scheduleOnce(timerName, waitTime.millis)
} }
completeIfReady() completeIfReady()