diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index e094929011..adcc979174 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -13,12 +13,14 @@ import akka.stream.{ ActorMaterializer, Attributes, BufferOverflowException, Del import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.stream.ThrottleMode class FlowDelaySpec extends StreamSpec { implicit val materializer = ActorMaterializer() "A Delay" must { + "deliver elements with some time shift" in { Await.result( Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head), @@ -156,5 +158,16 @@ class FlowDelaySpec extends StreamSpec { expectMsg(Done) } + "not overflow buffer when DelayOverflowStrategy.backpressure" in { + val probe = Source(1 to 6).delay(100.millis, DelayOverflowStrategy.backpressure) + .withAttributes(Attributes.inputBuffer(2, 2)) + .throttle(1, 200.millis, 1, ThrottleMode.Shaping) + .runWith(TestSink.probe) + + probe.request(10) + .expectNextN(1 to 6) + .expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 96c7ec907c..57d63a9ee6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl.fusing +import java.util.concurrent.TimeUnit.NANOSECONDS import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } @@ -1310,74 +1311,101 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this") case Some(InputBuffer(min, max)) ⇒ max } + val delayMillis = d.toMillis var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element - var willStop = false override def preStart(): Unit = buffer = BufferImpl(size, materializer) - //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full - def onPush(): Unit = { - if (buffer.isFull) strategy match { - case EmitEarly ⇒ + val onPushWhenBufferFull: () ⇒ Unit = strategy match { + case EmitEarly ⇒ + () ⇒ { if (!isTimerActive(timerName)) push(out, buffer.dequeue()._2) else { cancelTimer(timerName) onTimer(timerName) } - case DropHead ⇒ + } + case DropHead ⇒ + () ⇒ { buffer.dropHead() - grabAndPull(true) - case DropTail ⇒ + grabAndPull() + } + case DropTail ⇒ + () ⇒ { buffer.dropTail() - grabAndPull(true) - case DropNew ⇒ + grabAndPull() + } + case DropNew ⇒ + () ⇒ { grab(in) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) - case DropBuffer ⇒ + } + case DropBuffer ⇒ + () ⇒ { buffer.clear() - grabAndPull(true) - case Fail ⇒ + grabAndPull() + } + case Fail ⇒ + () ⇒ { failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) - case Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") - } + } + case Backpressure ⇒ + () ⇒ { + throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") + } + } + + def onPush(): Unit = { + if (buffer.isFull) + onPushWhenBufferFull() else { - grabAndPull(strategy != Backpressure || buffer.used < size - 1) - if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + grabAndPull() + if (!isTimerActive(timerName)) { + scheduleOnce(timerName, d) + } } } - def grabAndPull(pullCondition: Boolean): Unit = { + def pullCondition: Boolean = + strategy != Backpressure || buffer.used < size + + def grabAndPull(): Unit = { buffer.enqueue((System.nanoTime(), grab(in))) if (pullCondition) pull(in) } - override def onUpstreamFinish(): Unit = { - if (isAvailable(out) && isTimerActive(timerName)) willStop = true - else completeStage() - } + override def onUpstreamFinish(): Unit = + completeIfReady() def onPull(): Unit = { if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) push(out, buffer.dequeue()._2) - if (!willStop && !hasBeenPulled(in)) pull(in) + if (!isClosed(in) && !hasBeenPulled(in) && pullCondition) + pull(in) + completeIfReady() } setHandler(in, this) setHandler(out, this) - def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() + def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage() - def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000 + def nextElementWaitTime(): Long = { + delayMillis - NANOSECONDS.toMillis(System.nanoTime() - buffer.peek()._1) + } final override protected def onTimer(key: Any): Unit = { - push(out, buffer.dequeue()._2) + if (isAvailable(out)) + push(out, buffer.dequeue()._2) + if (!buffer.isEmpty) { val waitTime = nextElementWaitTime() - if (waitTime > 10) scheduleOnce(timerName, waitTime.millis) + if (waitTime > 10) + scheduleOnce(timerName, waitTime.millis) } completeIfReady() }