* "must send elements downstream as soon as time comes" * it failed with perThrottleInterval: Map(0 -> List((0,1), (499,2)), 2 -> List((1000,3), (1499,4)), 3 -> List((1999,5))) * round the interval groups * use dilated
This commit is contained in:
parent
c92489f9ed
commit
93a748b952
1 changed files with 7 additions and 6 deletions
|
|
@ -16,6 +16,7 @@ import akka.stream.ThrottleMode.{ Enforcing, Shaping }
|
|||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.scaladsl.StreamTestKit._
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.TestDuration
|
||||
import akka.testkit.TimingTest
|
||||
import akka.util.ByteString
|
||||
|
||||
|
|
@ -110,19 +111,19 @@ class FlowThrottleSpec extends StreamSpec("""
|
|||
}
|
||||
|
||||
"send elements downstream as soon as time comes" in assertAllStagesStopped {
|
||||
val throttleInterval = 500.millis
|
||||
val throttleInterval = 500.millis.dilated
|
||||
val elementsAndTimestampsMs = Source(1 to 5)
|
||||
.throttle(1, throttleInterval)
|
||||
.runFold(Nil: List[(Long, Int)]) { (acc, n) =>
|
||||
(System.nanoTime() / 1000000, n) :: acc
|
||||
}
|
||||
.futureValue(timeout(5.seconds))
|
||||
.futureValue(timeout(5.seconds.dilated))
|
||||
.reverse
|
||||
|
||||
val startMs = elementsAndTimestampsMs.head._1
|
||||
val elemsAndTimeFromStart = elementsAndTimestampsMs.map { case (ts, n) => (ts - startMs, n) }
|
||||
val perThrottleInterval = elemsAndTimeFromStart.groupBy {
|
||||
case (fromStart, _) => fromStart / throttleInterval.toMillis
|
||||
case (fromStart, _) => math.round(fromStart.toDouble / throttleInterval.toMillis).toInt
|
||||
}
|
||||
withClue(perThrottleInterval) {
|
||||
perThrottleInterval.forall { case (_, entries) => entries.size == 1 } should ===(true)
|
||||
|
|
@ -238,19 +239,19 @@ class FlowThrottleSpec extends StreamSpec("""
|
|||
}
|
||||
|
||||
"send elements downstream as soon as time comes" in assertAllStagesStopped {
|
||||
val throttleInterval = 500.millis
|
||||
val throttleInterval = 500.millis.dilated
|
||||
val elementsAndTimestampsMs = Source(1 to 5)
|
||||
.throttle(2, throttleInterval, _ => 2)
|
||||
.runFold(Nil: List[(Long, Int)]) { (acc, n) =>
|
||||
(System.nanoTime() / 1000000, n) :: acc
|
||||
}
|
||||
.futureValue(timeout(5.seconds))
|
||||
.futureValue(timeout(5.seconds.dilated))
|
||||
.reverse
|
||||
|
||||
val startMs = elementsAndTimestampsMs.head._1
|
||||
val elemsAndTimeFromStart = elementsAndTimestampsMs.map { case (ts, n) => (ts - startMs, n) }
|
||||
val perThrottleInterval = elemsAndTimeFromStart.groupBy {
|
||||
case (fromStart, _) => fromStart / throttleInterval.toMillis
|
||||
case (fromStart, _) => math.round(fromStart.toDouble / throttleInterval.toMillis).toInt
|
||||
}
|
||||
withClue(perThrottleInterval) {
|
||||
perThrottleInterval.forall { case (_, entries) => entries.size == 1 } should ===(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue