diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 50117066b0..dba0ed61b8 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -266,7 +266,7 @@ object TestSubscriber { /** * Fluent DSL * - * Expect a stream element during specified time, then timeout. + * Expect a stream element during specified time or timeout. */ def expectNext(d: FiniteDuration, element: I): Self = { probe.expectMsg(d, OnNext(element)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 02680ad0c3..8099575a6e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -24,6 +24,15 @@ class FlowDelaySpec extends AkkaSpec { 1200.millis) should ===(1 to 10) } + "add delay to initialDelay if exists upstream" in { + Source(1 to 10).initialDelay(1.second).delay(1.second).runWith(TestSink.probe[Int]) + .request(10) + .expectNoMsg(1800.millis) + .expectNext(300.millis, 1) + .expectNextN(2 to 10) + .expectComplete() + } + "deliver element after time passed from actual receiving element" in { Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) .request(2) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index cf2155696b..d98be5f3e0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -861,8 +861,8 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - val buffer = FixedSizeBuffer[(Long, T)](maxBuffer) // buffer has pairs timestamp with upstream element + val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element val timerName = "DelayedTimer" var willStop = false @@ -878,28 +878,28 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg } case DelayOverflowStrategy.DropHead ⇒ buffer.dropHead() - grabElementAndSchedule(true) + grabAndPull(true) case DelayOverflowStrategy.DropTail ⇒ buffer.dropTail() - grabElementAndSchedule(true) + grabAndPull(true) case DelayOverflowStrategy.DropNew ⇒ grab(in) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) case DelayOverflowStrategy.DropBuffer ⇒ buffer.clear() - grabElementAndSchedule(true) + grabAndPull(true) case DelayOverflowStrategy.Fail ⇒ - failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $maxBuffer)!")) - case DelayOverflowStrategy.Backpressure ⇒ //do nothing here + failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) + case DelayOverflowStrategy.Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } else { - grabElementAndSchedule(strategy != DelayOverflowStrategy.Backpressure) + grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } } - def grabElementAndSchedule(pullCondition: Boolean): Unit = { + def grabAndPull(pullCondition: Boolean): Unit = { buffer.enqueue((System.currentTimeMillis(), grab(in))) - if (!isTimerActive(timerName)) scheduleOnce(timerName, d) if (pullCondition) pull(in) }