19862 Token bucket reimplemented

This commit is contained in:
Endre Sándor Varga 2016-02-25 11:19:52 +01:00
parent 5545ff8fa7
commit d815ed1b5e
8 changed files with 436 additions and 78 deletions

View file

@ -0,0 +1,253 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
import akka.testkit.AkkaSpec
import scala.util.Random
class TokenBucketSpec extends AkkaSpec {
class TestBucket(_cap: Long, _period: Long) extends TokenBucket(_cap, _period) {
var currentTime: Long = 0L
}
"A Token Bucket" must {
"start full" in {
val bucket = new TestBucket(10, 1)
bucket.init()
bucket.offer(1) should ===(0)
bucket.offer(1) should ===(0)
bucket.offer(1) should ===(0)
bucket.offer(7) should ===(0)
bucket.offer(3) should ===(3)
}
"calculate correctly with different rates and capacities" in {
val bucketRate2 = new TestBucket(10, 2)
bucketRate2.init()
bucketRate2.offer(5) should ===(0)
bucketRate2.offer(5) should ===(0)
bucketRate2.offer(5) should ===(10)
val bucketRate3 = new TestBucket(8, 3)
bucketRate3.init()
bucketRate3.offer(5) should ===(0)
bucketRate3.offer(5) should ===(6)
bucketRate3.currentTime = 6
bucketRate3.offer(3) should ===(9)
}
"allow sending elements larger than capacity" in {
val bucket = new TestBucket(10, 2)
bucket.init()
bucket.offer(5) should ===(0)
bucket.offer(20) should ===(30)
bucket.currentTime = 30
bucket.offer(1) should ===(2)
bucket.currentTime = 34
bucket.offer(1) should ===(0)
bucket.offer(1) should ===(2)
}
"work with zero capacity" in {
val bucket = new TestBucket(0, 2)
bucket.init()
bucket.offer(10) should ===(20)
bucket.currentTime = 40
bucket.offer(10) should ===(20)
}
"not delay if rate is higher than production" in {
val bucket = new TestBucket(1, 10)
bucket.init()
for (time 0 to 100 by 10) {
bucket.currentTime = time
bucket.offer(1) should ===(0)
}
}
"maintain maximum capacity" in {
val bucket = new TestBucket(10, 1)
bucket.init()
bucket.offer(10) should ===(0)
bucket.currentTime = 100000
bucket.offer(20) should ===(10)
}
"work if currentTime is negative" in {
val bucket = new TestBucket(10, 1)
bucket.currentTime = -100 // Must be set before init()!
bucket.init()
bucket.offer(5) should ===(0)
bucket.offer(10) should ===(5)
bucket.currentTime += 10
bucket.offer(5) should ===(0)
}
"work if currentTime wraps over" in {
val bucket = new TestBucket(10, 1)
bucket.currentTime = Long.MaxValue - 5 // Must be set before init()!
bucket.init()
bucket.offer(5) should ===(0)
bucket.offer(10) should ===(5)
bucket.currentTime += 10
bucket.offer(5) should ===(0)
}
"(attempt to) maintain equal time between token renewal intervals" in {
val bucket = new TestBucket(5, 3)
bucket.init()
bucket.offer(10) should ===(15)
bucket.currentTime = 16
// At this point there is no token in the bucket (we consumed it at T15) but the next token will arrive at T18!
// A naive calculation would consider that there is 1 token needed hence we need to wait 3 units, but in fact
// we only need to wait 2 units, otherwise we shift the whole token arrival sequence resulting in lower rates.
//
// 0 3 9 12 15 18 21 24 27
// +---+---+---+---+---+---+---+---+
// ^ ^
// emitted here --+ +---- currently here (T16)
//
bucket.offer(1) should ===(2)
bucket.currentTime = 19
// At 18 bucket is empty, and so is at 19. For a cost of 2 we need to wait until T24 which is 5 units.
//
// 0 3 9 12 15 18 21 24 27
// +---+---+---+---+---+---+---+---+
// ^ ^
// emptied here --+ +---- currently here (T19)
//
bucket.offer(2) should ===(5)
// Another case
val bucket2 = new TestBucket(10, 3)
bucket2.init()
bucket2.currentTime = 4
bucket2.offer(6) should ===(0)
// 4 tokens remain and new tokens arrive at T6 and T9 so here we have 6 tokens remaining.
// We need 1 more, which will arrive at T12
bucket2.currentTime = 10
bucket2.offer(7) should ===(2)
}
"work with cost of zero" in {
val bucket = new TestBucket(10, 1)
bucket.init()
// Can be called any number of times
bucket.offer(0)
bucket.offer(0)
bucket.offer(0)
bucket.offer(10) should ===(0)
// Bucket is empty now
// Still can be called any number of times
bucket.offer(0)
bucket.offer(0)
bucket.offer(0)
}
"work with very slow rates" in {
val T = Long.MaxValue >> 10
val bucket = new TestBucket(10, T)
bucket.init()
bucket.offer(20) should ===(10 * T)
bucket.currentTime += 10 * T
// Collect 5 tokens
bucket.currentTime += 5 * T
bucket.offer(4) should ===(0)
bucket.offer(2) should ===(T)
}
"behave exactly as the ideal (naive) token bucket if offer is called with perfect timing" in {
val Debug = false
for {
capacity List(0, 1, 5, 10)
period List(1, 3, 5)
arrivalPeriod List(1, 3, 5)
startTime List(Long.MinValue, -1L, 0L, Long.MaxValue)
maxCost List(1, 5, 10)
} {
val bucket = new TestBucket(capacity, period)
bucket.currentTime = startTime
bucket.init()
var idealBucket = capacity
var untilNextTick = period
var untilNextElement = Random.nextInt(arrivalPeriod) + 1
var nextEmit = 0L
var delaying = false
for (time 0 to 1000) {
if (untilNextTick == 0) {
untilNextTick = period
idealBucket = math.min(idealBucket + 1, capacity)
}
if (Debug) println(s"T:$time bucket:$idealBucket")
if (delaying && idealBucket == 0) {
// Actual emit time should equal to what the optimized token bucket calculates
time should ===(nextEmit)
untilNextElement = time + Random.nextInt(arrivalPeriod)
if (Debug) println(s" EMITTING")
delaying = false
}
if (untilNextElement == 0) {
// Allow cost of zer
val cost = Random.nextInt(maxCost + 1)
idealBucket -= cost // This can go negative
bucket.currentTime = startTime + time
val delay = bucket.offer(cost)
nextEmit = time + delay
if (Debug) println(s" ARRIVAL cost: $cost at: $nextEmit")
if (delay == 0) {
(idealBucket >= 0) should be(true)
untilNextElement = time + Random.nextInt(arrivalPeriod)
} else delaying = true
}
untilNextTick -= 1
untilNextElement -= 1
}
}
}
}
}

View file

@ -0,0 +1,88 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
/**
* INTERNAL API
*/
private[akka] abstract class TokenBucket(capacity: Long, nanosBetweenTokens: Long) {
require(capacity >= 0, "Capacity must be non-negative.")
require(nanosBetweenTokens > 0, "Time between tokens must be larger than zero nanoseconds.")
private[this] var availableTokens: Long = _
private[this] var lastUpdate: Long = _
/**
* This method must be called before the token bucket can be used.
*/
def init(): Unit = {
availableTokens = capacity
lastUpdate = currentTime
}
/**
* The current time in nanos. The returned value is monotonic, might wrap over and has no relationship with wall-clock.
*
* @return return the current time in nanos as a Long.
*/
def currentTime: Long
/**
* Call this (side-effecting) method whenever an element should be passed through the token-bucket. This method
* will return the number of nanoseconds the element needs to be delayed to conform with the token bucket parameters.
* Returns zero if the element can be emitted immediately. The method does not handle overflow, if an element is to
* be delayed longer in nanoseconds than what can be represented as a positive Long then an undefined value is returned.
*
* If a non-zero value is returned, it is the responsibility of the caller to not call this method before the
* returned delay has been elapsed (but can be called later). This class does not check or protect against early
* calls.
*
* @param cost How many tokens the element costs. Can be larger than the capacity of the bucket.
* @return
*/
def offer(cost: Long): Long = {
if (cost < 0) throw new IllegalArgumentException("Cost must be non-negative")
val now = currentTime
val timeElapsed = now - lastUpdate
val tokensArrived =
// Was there even a tick since last time?
if (timeElapsed >= nanosBetweenTokens) {
// only one tick elapsed
if (timeElapsed < nanosBetweenTokens * 2) {
lastUpdate += nanosBetweenTokens
1
} else {
// Ok, no choice, do the slow integer division
val tokensArrived = timeElapsed / nanosBetweenTokens
lastUpdate += tokensArrived * nanosBetweenTokens
tokensArrived
}
} else 0
availableTokens = math.min(availableTokens + tokensArrived, capacity)
if (cost <= availableTokens) {
availableTokens -= cost
0
} else {
val remainingCost = cost - availableTokens
// Tokens always arrive at exact multiples of the token generation period, we must account for that
val timeSinceTokenArrival = now - lastUpdate
val delay = remainingCost * nanosBetweenTokens - timeSinceTokenArrival
availableTokens = 0
lastUpdate = now + delay
delay
}
}
}
/**
* Default implementation of [[TokenBucket]] that uses `System.nanoTime` as the time source.
*/
final class NanoTimeTokenBucket(_cap: Long, _period: Long) extends TokenBucket(_cap, _period) {
override def currentTime: Long = System.nanoTime()
}

View file

@ -29,6 +29,23 @@ class FlowThrottleSpec extends AkkaSpec {
.expectComplete()
}
"accept very high rates" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 1 nanos, 0, ThrottleMode.Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
}
"accept very low rates" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.days, 1, ThrottleMode.Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1)
.expectNoMsg(100.millis)
.cancel() // We won't wait 100 days, sorry
}
"emit single element per tick" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()

View file

@ -7,36 +7,13 @@ import akka.stream.ThrottleMode.{ Enforcing, Shaping }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage._
import akka.stream._
import akka.util.NanoTimeTokenBucket
import scala.concurrent.duration.{ FiniteDuration, _ }
/**
* INTERNAL API
*/
private[stream] object Throttle {
val miniTokenBits = 30
private def tokenToMiniToken(e: Int): Long = e.toLong << Throttle.miniTokenBits
}
/**
* INTERNAL API
*/
/*
* This class tracks a token bucket in an efficient way.
*
* For accuracy, instead of tracking integer tokens the implementation tracks "miniTokens" which are 1/2^30 fraction
* of a token. This allows us to track token replenish rate as miniTokens/nanosecond which allows us to use simple
* arithmetic without division and also less inaccuracy due to rounding on token count caculation.
*
* The replenish amount, and hence the current time is only queried if the bucket does not hold enough miniTokens, in
* other words, replenishing the bucket is *on-need*. In addition, to compensate scheduler inaccuracy, the implementation
* calculates the ideal "previous time" explicitly, not relying on the scheduler to tick at that time. This means that
* when the scheduler actually ticks, some time has been elapsed since the calculated ideal tick time, and those tokens
* are added to the bucket as any calculation is always relative to the ideal tick time.
*
*/
private[stream] class Throttle[T](cost: Int,
per: FiniteDuration,
maximumBurst: Int,
@ -44,70 +21,65 @@ private[stream] class Throttle[T](cost: Int,
mode: ThrottleMode)
extends SimpleLinearGraphStage[T] {
require(cost > 0, "cost must be > 0")
require(per.toMillis > 0, "per time must be > 0")
require(per.toNanos > 0, "per time must be > 0")
require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode")
require(per.toNanos >= cost, "Rates larger than 1 unit / nanosecond are not supported")
private val maximumBurstMiniTokens = Throttle.tokenToMiniToken(maximumBurst)
private val miniTokensPerNanos = (Throttle.tokenToMiniToken(cost).toDouble / per.toNanos).toLong
// There is some loss of precision here because of rounding, but this only happens if nanosBetweenTokens is very
// small which is usually at rates where that precision is highly unlikely anyway as the overhead of this stage
// is likely higher than the required accuracy interval.
private val nanosBetweenTokens = per.toNanos / cost
private val timerName: String = "ThrottleTimer"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private val tokenBucket = new NanoTimeTokenBucket(maximumBurst, nanosBetweenTokens)
var willStop = false
var previousMiniTokens: Long = maximumBurstMiniTokens
var previousNanos: Long = System.nanoTime()
var currentElement: T = _
val enforcing = mode match {
case Enforcing true
case Shaping false
}
var currentElement: Option[T] = None
setHandler(in, new InHandler {
override def preStart(): Unit = tokenBucket.init()
// This scope is here just to not retain an extra reference to the handler below.
// We can't put this code into preRestart() because setHandler() must be called before that.
{
val handler = new InHandler with OutHandler {
override def onUpstreamFinish(): Unit =
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
else completeStage()
override def onPush(): Unit = {
val elem = grab(in)
val elementCostMiniTokens = Throttle.tokenToMiniToken(costCalculation(elem))
val cost = costCalculation(elem)
val delayNanos = tokenBucket.offer(cost)
if (previousMiniTokens >= elementCostMiniTokens) {
previousMiniTokens -= elementCostMiniTokens
push(out, elem)
} else {
val currentNanos = System.nanoTime()
val currentMiniTokens = Math.min(
(currentNanos - previousNanos) * miniTokensPerNanos + previousMiniTokens,
maximumBurstMiniTokens)
if (currentMiniTokens < elementCostMiniTokens)
mode match {
case Shaping
currentElement = Some(elem)
val waitNanos = (elementCostMiniTokens - currentMiniTokens) / miniTokensPerNanos
previousNanos = currentNanos + waitNanos
scheduleOnce(timerName, waitNanos.nanos)
case Enforcing failStage(new RateExceededException("Maximum throttle throughput exceeded"))
}
if (delayNanos == 0L) push(out, elem)
else {
previousMiniTokens = currentMiniTokens - elementCostMiniTokens
previousNanos = currentNanos
push(out, elem)
if (enforcing) failStage(new RateExceededException("Maximum throttle throughput exceeded."))
else {
currentElement = elem
scheduleOnce(timerName, delayNanos.nanos)
}
}
}
})
override def onPull(): Unit = pull(in)
}
setHandler(in, handler)
setHandler(out, handler)
// After this point, we no longer need the `handler` so it can just fall out of scope.
}
override protected def onTimer(key: Any): Unit = {
push(out, currentElement.get)
currentElement = None
previousMiniTokens = 0
push(out, currentElement)
currentElement = null.asInstanceOf[T]
if (willStop) completeStage()
}
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
override def preStart(): Unit = previousNanos = System.nanoTime()
}
override def toString = "Throttle"

View file

@ -1614,6 +1614,13 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures

View file

@ -1742,6 +1742,13 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures

View file

@ -1153,6 +1153,13 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures

View file

@ -1479,6 +1479,13 @@ trait FlowOps[+Out, +Mat] {
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing