Fix an issue of prolonged delay in the Delay stage #26470
This commit is contained in:
parent
51041b1faf
commit
44dcfe057a
2 changed files with 43 additions and 2 deletions
|
|
@ -12,6 +12,8 @@ import akka.stream.testkit.scaladsl.TestSink
|
|||
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
|
||||
import akka.stream._
|
||||
import akka.testkit.TimingTest
|
||||
import org.scalatest.concurrent.PatienceConfiguration
|
||||
import org.scalatest.time.{ Milliseconds, Span }
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -220,6 +222,39 @@ class FlowDelaySpec extends StreamSpec {
|
|||
probe.request(10).expectNextN(1 to 2).expectComplete()
|
||||
}
|
||||
|
||||
"not block overdue elements from being pushed to downstream stages" in {
|
||||
val N = 128
|
||||
val batchSize = 16
|
||||
val delayMillis = 500
|
||||
|
||||
val elements = (1 to N).iterator
|
||||
|
||||
val future = Source
|
||||
.tick(0.millis, 10.millis, 1)
|
||||
.mapConcat(_ => (1 to batchSize).map(_ => elements.next()))
|
||||
.take(N)
|
||||
.map { elem =>
|
||||
System.nanoTime() -> elem
|
||||
}
|
||||
.delay(delayMillis.millis, DelayOverflowStrategy.backpressure)
|
||||
.withAttributes(Attributes.inputBuffer(4, 4))
|
||||
.map {
|
||||
case (startTimestamp, elem) =>
|
||||
(System.nanoTime() - startTimestamp) / 1e6 -> elem
|
||||
}
|
||||
.runWith(Sink.seq)
|
||||
|
||||
val results = future.futureValue(PatienceConfiguration.Timeout(Span(60000, Milliseconds)))
|
||||
results.length shouldBe N
|
||||
|
||||
// check if every elements are delayed by roughly the same amount of time
|
||||
val delayHistogram = results.map(x => Math.floor(x._1 / delayMillis) * delayMillis).groupBy(identity).map {
|
||||
case (bucket, delays) => (bucket, delays.length)
|
||||
}
|
||||
|
||||
delayHistogram shouldEqual Map(delayMillis.toDouble -> N)
|
||||
}
|
||||
|
||||
// repeater for #27095
|
||||
"not throw NPE when using EmitEarly and buffer is full" taggedAs TimingTest in {
|
||||
val result =
|
||||
|
|
|
|||
|
|
@ -1747,7 +1747,7 @@ private[stream] object Collect {
|
|||
case _: DropNew =>
|
||||
() => {
|
||||
grab(in)
|
||||
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
|
||||
pull(in)
|
||||
}
|
||||
case _: DropBuffer =>
|
||||
() => {
|
||||
|
|
@ -1770,7 +1770,13 @@ private[stream] object Collect {
|
|||
else {
|
||||
grabAndPull()
|
||||
if (!isTimerActive(timerName)) {
|
||||
scheduleOnce(timerName, d)
|
||||
// schedule a timer for the full-delay `d` only if the buffer is empty, because otherwise a
|
||||
// full-length timer will starve subsequent `onPull` callbacks, preventing overdue elements
|
||||
// to be discharged.
|
||||
if (buffer.isEmpty)
|
||||
scheduleOnce(timerName, d)
|
||||
else
|
||||
scheduleOnce(timerName, Math.max(DelayPrecisionMS, nextElementWaitTime()).millis)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue