=str #18890 fix throttle
This commit is contained in:
parent
5fc6902141
commit
f7d23b508b
7 changed files with 33 additions and 25 deletions
|
|
@ -24,4 +24,6 @@ private[akka] object ConstantFun {
|
|||
val zeroLong = (_: Any) ⇒ 0L
|
||||
|
||||
val oneLong = (_: Any) ⇒ 1L
|
||||
|
||||
val oneInt = (_: Any) ⇒ 1
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import akka.stream.stage._
|
|||
import akka.stream._
|
||||
|
||||
import scala.concurrent.duration.{ FiniteDuration, _ }
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -26,10 +25,10 @@ private[stream] class Throttle[T](cost: Int,
|
|||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
var willStop = false
|
||||
var lastTokens: Long = 0
|
||||
var previousTime: Long = 0
|
||||
var lastTokens: Long = maximumBurst
|
||||
var previousTime: Long = now()
|
||||
|
||||
val speed = ((cost.toDouble / per.toMillis) * 1024 * 1024).toLong
|
||||
val speed = ((cost.toDouble / per.toNanos) * 1073741824).toLong
|
||||
val timerName: String = "ThrottleTimer"
|
||||
|
||||
var currentElement: Option[T] = None
|
||||
|
|
@ -42,29 +41,36 @@ private[stream] class Throttle[T](cost: Int,
|
|||
else completeStage()
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val timeElapsed = now() - previousTime
|
||||
val currentTokens = Math.min(timeElapsed * speed + lastTokens, scaledMaximumBurst)
|
||||
val elem = grab(in)
|
||||
val elementCost = scale(costCalculation(elem))
|
||||
|
||||
if (lastTokens >= elementCost) {
|
||||
lastTokens -= elementCost
|
||||
push(out, elem)
|
||||
} else {
|
||||
val currentTime = now()
|
||||
val currentTokens = Math.min((currentTime - previousTime) * speed + lastTokens, scaledMaximumBurst)
|
||||
if (currentTokens < elementCost)
|
||||
mode match {
|
||||
case Shaping ⇒
|
||||
currentElement = Some(elem)
|
||||
scheduleOnce(timerName, ((elementCost - currentTokens) / speed).millis)
|
||||
val waitTime = (elementCost - currentTokens) / speed
|
||||
previousTime = currentTime + waitTime
|
||||
scheduleOnce(timerName, waitTime.nanos)
|
||||
case Enforcing ⇒ failStage(new RateExceededException("Maximum throttle throughput exceeded"))
|
||||
}
|
||||
else {
|
||||
lastTokens = currentTokens - elementCost
|
||||
previousTime = now()
|
||||
previousTime = currentTime
|
||||
push(out, elem)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
override protected def onTimer(key: Any): Unit = {
|
||||
push(out, currentElement.get)
|
||||
currentElement = None
|
||||
previousTime = now()
|
||||
lastTokens = 0
|
||||
if (willStop) completeStage()
|
||||
}
|
||||
|
|
@ -75,9 +81,9 @@ private[stream] class Throttle[T](cost: Int,
|
|||
|
||||
override def preStart(): Unit = previousTime = now()
|
||||
|
||||
private def now(): Long = System.currentTimeMillis()
|
||||
private def now(): Long = System.nanoTime()
|
||||
|
||||
private def scale(e: Int): Long = e.toLong << 20
|
||||
private def scale(e: Int): Long = e.toLong << 30
|
||||
}
|
||||
|
||||
override def toString = "Throttle"
|
||||
|
|
|
|||
|
|
@ -1569,7 +1569,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens.
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
|
|
|
|||
|
|
@ -1709,7 +1709,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens.
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
|
|
|
|||
|
|
@ -1119,7 +1119,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens.
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
|
|
|
|||
|
|
@ -1117,7 +1117,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens.
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
|
|
|
|||
|
|
@ -1432,7 +1432,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens.
|
||||
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
|
|
@ -1448,7 +1448,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] =
|
||||
throttle(elements, per, maximumBurst, _ ⇒ 1, mode)
|
||||
throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode)
|
||||
|
||||
/**
|
||||
* Sends elements downstream with speed limited to `cost/per`. Cost is
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue