diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index a98e2389da..f14c67ac86 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -3,12 +3,14 @@ */ package akka.stream.scaladsl +import akka.Done import akka.stream.Attributes._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ TestPublisher, TestSubscriber } -import akka.stream.{ BufferOverflowException, DelayOverflowStrategy, ActorMaterializer } -import scala.concurrent.Await +import akka.stream.{ ActorMaterializer, Attributes, BufferOverflowException, DelayOverflowStrategy } + +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.testkit.AkkaSpec @@ -88,7 +90,7 @@ class FlowDelaySpec extends AkkaSpec { } "pass elements with delay through normally in backpressured mode" in assertAllStagesStopped { - Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).runWith(TestSink.probe[Int]) + Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).withAttributes(inputBuffer(1, 1)).runWith(TestSink.probe[Int]) .request(5) .expectNoMsg(200.millis) .expectNext(200.millis, 1) @@ -123,5 +125,37 @@ class FlowDelaySpec extends AkkaSpec { //fail will terminate despite of non empty internal buffer pSub.sendError(new RuntimeException() with NoStackTrace) } + + "properly delay according to buffer size" in { + import akka.pattern.pipe + import system.dispatcher + + // With a buffer size of 1, delays add up + Source(1 to 5) + .delay(500.millis, DelayOverflowStrategy.backpressure) + .withAttributes(Attributes.inputBuffer(initial = 1, max = 1)) + .runWith(Sink.ignore).pipeTo(testActor) + + expectNoMsg(2.seconds) + expectMsg(Done) + + // With a buffer large enough to hold all arriving elements, delays don't add up + Source(1 to 100) + .delay(1.second, DelayOverflowStrategy.backpressure) + .withAttributes(Attributes.inputBuffer(initial = 100, max = 100)) + .runWith(Sink.ignore).pipeTo(testActor) + + expectMsg(Done) + + // Delays that are already present are preserved when buffer is large enough + Source.tick(100.millis, 100.millis, ()).take(10) + .delay(1.second, DelayOverflowStrategy.backpressure) + .withAttributes(Attributes.inputBuffer(initial = 10, max = 10)) + .runWith(Sink.ignore).pipeTo(testActor) + + expectNoMsg(900.millis) + expectMsg(Done) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index b943026fe7..48e664f0df 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -84,7 +84,7 @@ private[stream] object Stages { val repeat = name("repeat") val unfold = name("unfold") val unfoldAsync = name("unfoldAsync") - val delay = name("delay") and inputBuffer(16, 16) + val delay = name("delay") val terminationWatcher = name("terminationWatcher") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 017a1d3da6..f90e34e4bb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1125,7 +1125,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS case Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } else { - grabAndPull(strategy != Backpressure || buffer.capacity < size - 1) + grabAndPull(strategy != Backpressure || buffer.used < size - 1) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } }