diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index b17b7fe85d..6a4a058c1b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -12,6 +12,8 @@ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream._ import akka.testkit.TimingTest +import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.time.{ Milliseconds, Span } import scala.concurrent.Await import scala.concurrent.duration._ @@ -220,6 +222,39 @@ class FlowDelaySpec extends StreamSpec { probe.request(10).expectNextN(1 to 2).expectComplete() } + "not block overdue elements from being pushed to downstream stages" in { + val N = 128 + val batchSize = 16 + val delayMillis = 500 + + val elements = (1 to N).iterator + + val future = Source + .tick(0.millis, 10.millis, 1) + .mapConcat(_ => (1 to batchSize).map(_ => elements.next())) + .take(N) + .map { elem => + System.nanoTime() -> elem + } + .delay(delayMillis.millis, DelayOverflowStrategy.backpressure) + .withAttributes(Attributes.inputBuffer(4, 4)) + .map { + case (startTimestamp, elem) => + (System.nanoTime() - startTimestamp) / 1e6 -> elem + } + .runWith(Sink.seq) + + val results = future.futureValue(PatienceConfiguration.Timeout(Span(60000, Milliseconds))) + results.length shouldBe N + + // check if every elements are delayed by roughly the same amount of time + val delayHistogram = results.map(x => Math.floor(x._1 / delayMillis) * delayMillis).groupBy(identity).map { + case (bucket, delays) => (bucket, delays.length) + } + + delayHistogram shouldEqual Map(delayMillis.toDouble -> N) + } + // repeater for #27095 "not throw NPE when using EmitEarly and buffer is full" taggedAs TimingTest in { val result = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d1e9e53403..a1bcea8b12 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1747,7 +1747,7 @@ private[stream] object Collect { case _: DropNew => () => { grab(in) - if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + pull(in) } case _: DropBuffer => () => { @@ -1770,7 +1770,13 @@ private[stream] object Collect { else { grabAndPull() if (!isTimerActive(timerName)) { - scheduleOnce(timerName, d) + // schedule a timer for the full-delay `d` only if the buffer is empty, because otherwise a + // full-length timer will starve subsequent `onPull` callbacks, preventing overdue elements + // to be discharged. + if (buffer.isEmpty) + scheduleOnce(timerName, d) + else + scheduleOnce(timerName, Math.max(DelayPrecisionMS, nextElementWaitTime()).millis) } } }