diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index 210f72b1cf..5702f46373 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -88,7 +88,7 @@ private[akka] object OverflowStrategies { * INTERNAL API */ private[akka] case object EmitEarly extends DelayOverflowStrategy { - private[akka] override def isBackpressure: Boolean = false + private[akka] override def isBackpressure: Boolean = true } } @@ -132,6 +132,7 @@ object DelayOverflowStrategy { /** * If the buffer is full when a new element is available this strategy send next element downstream without waiting + * Will backpressure if downstream is not ready. */ def emitEarly: DelayOverflowStrategy = EmitEarly diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 1b6d48eecc..6e2788ae37 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1700,37 +1700,46 @@ private[stream] object Collect { /** * INTERNAL API */ -@InternalApi private[akka] final class Delay[T]( - private[this] val delayStrategySupplier: () => DelayStrategy[_ >: T], - private[this] val overflowStrategy: DelayOverflowStrategy) - extends SimpleLinearGraphStage[T] { - private[this] def timerName = "DelayedTimer" +@InternalApi object Delay { + private val TimerName = "DelayedTimer" + private val DelayPrecisionMS = 10 +} - private[this] val DelayPrecisionMS = 10 +/** + * INTERNAL API + */ +@InternalApi private[akka] final class Delay[T]( + delayStrategySupplier: () => DelayStrategy[T], + overflowStrategy: DelayOverflowStrategy) + extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { + import Delay._ + private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max private[this] val delayStrategy = delayStrategySupplier() - private[this] var buffer - : BufferImpl[(Long, T)] = _ // buffer has pairs timestamp of expected push with upstream element - - override def preStart(): Unit = buffer = BufferImpl(size, inheritedAttributes) + // buffer has pairs of timestamp of expected push and element + private[this] val buffer = BufferImpl[(Long, T)](size, inheritedAttributes) private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match { case EmitEarly => () => { if (isAvailable(out)) { - if (isTimerActive(timerName)) - cancelTimer(timerName) + if (isTimerActive(TimerName)) { + cancelTimer(TimerName) + } push(out, buffer.dequeue()._2) grabAndPull() completeIfReady() + } else { + throw new IllegalStateException( + "Was configured to emitEarly and got element when out is not ready and buffer is full, should not be possible.") } } case _: DropHead => @@ -1746,7 +1755,7 @@ private[stream] object Collect { case _: DropNew => () => { grab(in) - if (pullCondition) pull(in) + if (shouldPull) pull(in) } case _: DropBuffer => () => { @@ -1768,39 +1777,41 @@ private[stream] object Collect { onPushWhenBufferFull() else { grabAndPull() - if (!isTimerActive(timerName)) { + if (!isTimerActive(TimerName)) { val waitTime = nextElementWaitTime() if (waitTime <= DelayPrecisionMS && isAvailable(out)) { push(out, buffer.dequeue()._2) completeIfReady() } else - scheduleOnce(timerName, waitTime.millis) + scheduleOnce(TimerName, waitTime.millis) } } } - private def pullCondition: Boolean = - !overflowStrategy.isBackpressure || buffer.used < size + private def shouldPull: Boolean = + buffer.used < size || !overflowStrategy.isBackpressure || + // we can only emit early if output is ready + (overflowStrategy == EmitEarly && isAvailable(out)) private def grabAndPull(): Unit = { val element = grab(in) buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element)) - if (pullCondition) pull(in) + if (shouldPull) pull(in) } override def onUpstreamFinish(): Unit = completeIfReady() def onPull(): Unit = { - if (!isTimerActive(timerName) && !buffer.isEmpty) { + if (!isTimerActive(TimerName) && !buffer.isEmpty) { val waitTime = nextElementWaitTime() if (waitTime <= DelayPrecisionMS) push(out, buffer.dequeue()._2) else - scheduleOnce(timerName, waitTime.millis) + scheduleOnce(TimerName, waitTime.millis) } - if (!isClosed(in) && !hasBeenPulled(in) && pullCondition) + if (!isClosed(in) && !hasBeenPulled(in) && shouldPull) pull(in) completeIfReady() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d1e81f3122..e8489aa895 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1600,8 +1600,10 @@ trait FlowOps[+Out, +Mat] { * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] = - via(new Delay[Out](() => DelayStrategy.fixedDelay(of), strategy)) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] = { + val fixedDelay = DelayStrategy.fixedDelay(of) + via(new Delay[Out](() => fixedDelay, strategy)) + } /** * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.