diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index bf771b9c03..ab39cba32f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -67,6 +67,26 @@ class FlowDelaySpec extends StreamSpec { c.expectComplete() } + "deliver delayed elements that arrive within the same timeout as preceding group of elements" taggedAs TimingTest in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + val p = TestPublisher.manualProbe[Int]() + + Source.fromPublisher(p).delay(300.millis).to(Sink.fromSubscriber(c)).run() + val cSub = c.expectSubscription() + val pSub = p.expectSubscription() + cSub.request(100) + pSub.sendNext(1) + pSub.sendNext(2) + c.expectNoMsg(200.millis) + pSub.sendNext(3) + c.expectNext(1) + c.expectNext(2) + c.expectNoMsg(150.millis) + c.expectNext(3) + pSub.sendComplete() + c.expectComplete() + } + "drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped { Await.result( Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 567c175188..66fba3a99e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1486,6 +1486,8 @@ private[stream] object Collect { @InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { private[this] def timerName = "DelayedTimer" + final val DelayPrecisionMS = 10 + override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { @@ -1563,8 +1565,14 @@ private[stream] object Collect { completeIfReady() def onPull(): Unit = { - if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) - push(out, buffer.dequeue()._2) + if (!isTimerActive(timerName) && !buffer.isEmpty) { + val waitTime = nextElementWaitTime() + if (waitTime < 0) { + push(out, buffer.dequeue()._2) + } else { + scheduleOnce(timerName, Math.max(DelayPrecisionMS, waitTime).millis) + } + } if (!isClosed(in) && !hasBeenPulled(in) && pullCondition) pull(in) @@ -1587,7 +1595,7 @@ private[stream] object Collect { if (!buffer.isEmpty) { val waitTime = nextElementWaitTime() - if (waitTime > 10) + if (waitTime > DelayPrecisionMS) scheduleOnce(timerName, waitTime.millis) } completeIfReady()