From d815ed1b5e68bc6097fbfabd48cedfc74b209853 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 25 Feb 2016 11:19:52 +0100 Subject: [PATCH] 19862 Token bucket reimplemented --- .../scala/akka/util/TokenBucketSpec.scala | 253 ++++++++++++++++++ .../main/scala/akka/util/TokenBucket.scala | 88 ++++++ .../stream/scaladsl/FlowThrottleSpec.scala | 17 ++ .../scala/akka/stream/impl/Throttle.scala | 128 ++++----- .../main/scala/akka/stream/javadsl/Flow.scala | 7 + .../scala/akka/stream/javadsl/Source.scala | 7 + .../scala/akka/stream/javadsl/SubFlow.scala | 7 + .../scala/akka/stream/scaladsl/Flow.scala | 7 + 8 files changed, 436 insertions(+), 78 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/util/TokenBucketSpec.scala create mode 100644 akka-actor/src/main/scala/akka/util/TokenBucket.scala diff --git a/akka-actor-tests/src/test/scala/akka/util/TokenBucketSpec.scala b/akka-actor-tests/src/test/scala/akka/util/TokenBucketSpec.scala new file mode 100644 index 0000000000..a02c99b394 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/TokenBucketSpec.scala @@ -0,0 +1,253 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.util + +import akka.testkit.AkkaSpec + +import scala.util.Random + +class TokenBucketSpec extends AkkaSpec { + + class TestBucket(_cap: Long, _period: Long) extends TokenBucket(_cap, _period) { + var currentTime: Long = 0L + } + + "A Token Bucket" must { + + "start full" in { + val bucket = new TestBucket(10, 1) + bucket.init() + + bucket.offer(1) should ===(0) + bucket.offer(1) should ===(0) + bucket.offer(1) should ===(0) + bucket.offer(7) should ===(0) + + bucket.offer(3) should ===(3) + } + + "calculate correctly with different rates and capacities" in { + val bucketRate2 = new TestBucket(10, 2) + bucketRate2.init() + + bucketRate2.offer(5) should ===(0) + bucketRate2.offer(5) should ===(0) + bucketRate2.offer(5) should ===(10) + + val bucketRate3 = new TestBucket(8, 3) + bucketRate3.init() + bucketRate3.offer(5) should ===(0) + bucketRate3.offer(5) should ===(6) + + bucketRate3.currentTime = 6 + bucketRate3.offer(3) should ===(9) + } + + "allow sending elements larger than capacity" in { + val bucket = new TestBucket(10, 2) + bucket.init() + + bucket.offer(5) should ===(0) + bucket.offer(20) should ===(30) + + bucket.currentTime = 30 + bucket.offer(1) should ===(2) + + bucket.currentTime = 34 + bucket.offer(1) should ===(0) + bucket.offer(1) should ===(2) + } + + "work with zero capacity" in { + val bucket = new TestBucket(0, 2) + bucket.init() + + bucket.offer(10) should ===(20) + + bucket.currentTime = 40 + bucket.offer(10) should ===(20) + } + + "not delay if rate is higher than production" in { + val bucket = new TestBucket(1, 10) + bucket.init() + + for (time ← 0 to 100 by 10) { + bucket.currentTime = time + bucket.offer(1) should ===(0) + } + + } + + "maintain maximum capacity" in { + val bucket = new TestBucket(10, 1) + bucket.init() + bucket.offer(10) should ===(0) + + bucket.currentTime = 100000 + bucket.offer(20) should ===(10) + } + + "work if currentTime is negative" in { + val bucket = new TestBucket(10, 1) + bucket.currentTime = -100 // Must be set before init()! + bucket.init() + + bucket.offer(5) should ===(0) + bucket.offer(10) should ===(5) + + bucket.currentTime += 10 + + bucket.offer(5) should ===(0) + } + + "work if currentTime wraps over" in { + val bucket = new TestBucket(10, 1) + bucket.currentTime = Long.MaxValue - 5 // Must be set before init()! + bucket.init() + + bucket.offer(5) should ===(0) + bucket.offer(10) should ===(5) + + bucket.currentTime += 10 + + bucket.offer(5) should ===(0) + } + + "(attempt to) maintain equal time between token renewal intervals" in { + val bucket = new TestBucket(5, 3) + bucket.init() + + bucket.offer(10) should ===(15) + + bucket.currentTime = 16 + // At this point there is no token in the bucket (we consumed it at T15) but the next token will arrive at T18! + // A naive calculation would consider that there is 1 token needed hence we need to wait 3 units, but in fact + // we only need to wait 2 units, otherwise we shift the whole token arrival sequence resulting in lower rates. + // + // 0 3 9 12 15 18 21 24 27 + // +---+---+---+---+---+---+---+---+ + // ^ ^ + // emitted here --+ +---- currently here (T16) + // + + bucket.offer(1) should ===(2) + + bucket.currentTime = 19 + // At 18 bucket is empty, and so is at 19. For a cost of 2 we need to wait until T24 which is 5 units. + // + // 0 3 9 12 15 18 21 24 27 + // +---+---+---+---+---+---+---+---+ + // ^ ^ + // emptied here --+ +---- currently here (T19) + // + bucket.offer(2) should ===(5) + + // Another case + val bucket2 = new TestBucket(10, 3) + bucket2.init() + + bucket2.currentTime = 4 + bucket2.offer(6) should ===(0) + + // 4 tokens remain and new tokens arrive at T6 and T9 so here we have 6 tokens remaining. + // We need 1 more, which will arrive at T12 + bucket2.currentTime = 10 + bucket2.offer(7) should ===(2) + } + + "work with cost of zero" in { + val bucket = new TestBucket(10, 1) + bucket.init() + + // Can be called any number of times + bucket.offer(0) + bucket.offer(0) + bucket.offer(0) + + bucket.offer(10) should ===(0) + + // Bucket is empty now + // Still can be called any number of times + bucket.offer(0) + bucket.offer(0) + bucket.offer(0) + } + + "work with very slow rates" in { + val T = Long.MaxValue >> 10 + val bucket = new TestBucket(10, T) + bucket.init() + + bucket.offer(20) should ===(10 * T) + bucket.currentTime += 10 * T + + // Collect 5 tokens + bucket.currentTime += 5 * T + + bucket.offer(4) should ===(0) + bucket.offer(2) should ===(T) + } + + "behave exactly as the ideal (naive) token bucket if offer is called with perfect timing" in { + val Debug = false + + for { + capacity ← List(0, 1, 5, 10) + period ← List(1, 3, 5) + arrivalPeriod ← List(1, 3, 5) + startTime ← List(Long.MinValue, -1L, 0L, Long.MaxValue) + maxCost ← List(1, 5, 10) + } { + + val bucket = new TestBucket(capacity, period) + bucket.currentTime = startTime + bucket.init() + + var idealBucket = capacity + var untilNextTick = period + var untilNextElement = Random.nextInt(arrivalPeriod) + 1 + var nextEmit = 0L + var delaying = false + + for (time ← 0 to 1000) { + if (untilNextTick == 0) { + untilNextTick = period + idealBucket = math.min(idealBucket + 1, capacity) + } + + if (Debug) println(s"T:$time bucket:$idealBucket") + + if (delaying && idealBucket == 0) { + // Actual emit time should equal to what the optimized token bucket calculates + time should ===(nextEmit) + untilNextElement = time + Random.nextInt(arrivalPeriod) + if (Debug) println(s" EMITTING") + delaying = false + } + + if (untilNextElement == 0) { + // Allow cost of zer + val cost = Random.nextInt(maxCost + 1) + idealBucket -= cost // This can go negative + bucket.currentTime = startTime + time + val delay = bucket.offer(cost) + nextEmit = time + delay + if (Debug) println(s" ARRIVAL cost: $cost at: $nextEmit") + if (delay == 0) { + (idealBucket >= 0) should be(true) + untilNextElement = time + Random.nextInt(arrivalPeriod) + } else delaying = true + } + + untilNextTick -= 1 + untilNextElement -= 1 + } + } + + } + + } + +} diff --git a/akka-actor/src/main/scala/akka/util/TokenBucket.scala b/akka-actor/src/main/scala/akka/util/TokenBucket.scala new file mode 100644 index 0000000000..f3bb35f0b6 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/TokenBucket.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.util + +/** + * INTERNAL API + */ +private[akka] abstract class TokenBucket(capacity: Long, nanosBetweenTokens: Long) { + require(capacity >= 0, "Capacity must be non-negative.") + require(nanosBetweenTokens > 0, "Time between tokens must be larger than zero nanoseconds.") + + private[this] var availableTokens: Long = _ + private[this] var lastUpdate: Long = _ + + /** + * This method must be called before the token bucket can be used. + */ + def init(): Unit = { + availableTokens = capacity + lastUpdate = currentTime + } + + /** + * The current time in nanos. The returned value is monotonic, might wrap over and has no relationship with wall-clock. + * + * @return return the current time in nanos as a Long. + */ + def currentTime: Long + + /** + * Call this (side-effecting) method whenever an element should be passed through the token-bucket. This method + * will return the number of nanoseconds the element needs to be delayed to conform with the token bucket parameters. + * Returns zero if the element can be emitted immediately. The method does not handle overflow, if an element is to + * be delayed longer in nanoseconds than what can be represented as a positive Long then an undefined value is returned. + * + * If a non-zero value is returned, it is the responsibility of the caller to not call this method before the + * returned delay has been elapsed (but can be called later). This class does not check or protect against early + * calls. + * + * @param cost How many tokens the element costs. Can be larger than the capacity of the bucket. + * @return + */ + def offer(cost: Long): Long = { + if (cost < 0) throw new IllegalArgumentException("Cost must be non-negative") + + val now = currentTime + val timeElapsed = now - lastUpdate + + val tokensArrived = + // Was there even a tick since last time? + if (timeElapsed >= nanosBetweenTokens) { + // only one tick elapsed + if (timeElapsed < nanosBetweenTokens * 2) { + lastUpdate += nanosBetweenTokens + 1 + } else { + // Ok, no choice, do the slow integer division + val tokensArrived = timeElapsed / nanosBetweenTokens + lastUpdate += tokensArrived * nanosBetweenTokens + tokensArrived + } + } else 0 + + availableTokens = math.min(availableTokens + tokensArrived, capacity) + + if (cost <= availableTokens) { + availableTokens -= cost + 0 + } else { + val remainingCost = cost - availableTokens + // Tokens always arrive at exact multiples of the token generation period, we must account for that + val timeSinceTokenArrival = now - lastUpdate + val delay = remainingCost * nanosBetweenTokens - timeSinceTokenArrival + availableTokens = 0 + lastUpdate = now + delay + delay + } + } + +} + +/** + * Default implementation of [[TokenBucket]] that uses `System.nanoTime` as the time source. + */ +final class NanoTimeTokenBucket(_cap: Long, _period: Long) extends TokenBucket(_cap, _period) { + override def currentTime: Long = System.nanoTime() +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala index 3368a9523e..5c50fccab4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -29,6 +29,23 @@ class FlowThrottleSpec extends AkkaSpec { .expectComplete() } + "accept very high rates" in Utils.assertAllStagesStopped { + Source(1 to 5).throttle(1, 1 nanos, 0, ThrottleMode.Shaping) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNext(1, 2, 3, 4, 5) + .expectComplete() + } + + "accept very low rates" in Utils.assertAllStagesStopped { + Source(1 to 5).throttle(1, 100.days, 1, ThrottleMode.Shaping) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNext(1) + .expectNoMsg(100.millis) + .cancel() // We won't wait 100 days, sorry + } + "emit single element per tick" in Utils.assertAllStagesStopped { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index b252e333fb..42cb4023f9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -7,36 +7,13 @@ import akka.stream.ThrottleMode.{ Enforcing, Shaping } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.stage._ import akka.stream._ +import akka.util.NanoTimeTokenBucket import scala.concurrent.duration.{ FiniteDuration, _ } /** * INTERNAL API */ -private[stream] object Throttle { - - val miniTokenBits = 30 - - private def tokenToMiniToken(e: Int): Long = e.toLong << Throttle.miniTokenBits -} - -/** - * INTERNAL API - */ -/* - * This class tracks a token bucket in an efficient way. - * - * For accuracy, instead of tracking integer tokens the implementation tracks "miniTokens" which are 1/2^30 fraction - * of a token. This allows us to track token replenish rate as miniTokens/nanosecond which allows us to use simple - * arithmetic without division and also less inaccuracy due to rounding on token count caculation. - * - * The replenish amount, and hence the current time is only queried if the bucket does not hold enough miniTokens, in - * other words, replenishing the bucket is *on-need*. In addition, to compensate scheduler inaccuracy, the implementation - * calculates the ideal "previous time" explicitly, not relying on the scheduler to tick at that time. This means that - * when the scheduler actually ticks, some time has been elapsed since the calculated ideal tick time, and those tokens - * are added to the bucket as any calculation is always relative to the ideal tick time. - * - */ private[stream] class Throttle[T](cost: Int, per: FiniteDuration, maximumBurst: Int, @@ -44,69 +21,64 @@ private[stream] class Throttle[T](cost: Int, mode: ThrottleMode) extends SimpleLinearGraphStage[T] { require(cost > 0, "cost must be > 0") - require(per.toMillis > 0, "per time must be > 0") + require(per.toNanos > 0, "per time must be > 0") require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") + require(per.toNanos >= cost, "Rates larger than 1 unit / nanosecond are not supported") - private val maximumBurstMiniTokens = Throttle.tokenToMiniToken(maximumBurst) - private val miniTokensPerNanos = (Throttle.tokenToMiniToken(cost).toDouble / per.toNanos).toLong + // There is some loss of precision here because of rounding, but this only happens if nanosBetweenTokens is very + // small which is usually at rates where that precision is highly unlikely anyway as the overhead of this stage + // is likely higher than the required accuracy interval. + private val nanosBetweenTokens = per.toNanos / cost private val timerName: String = "ThrottleTimer" override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + private val tokenBucket = new NanoTimeTokenBucket(maximumBurst, nanosBetweenTokens) + var willStop = false - var previousMiniTokens: Long = maximumBurstMiniTokens - var previousNanos: Long = System.nanoTime() - - var currentElement: Option[T] = None - - setHandler(in, new InHandler { - - override def onUpstreamFinish(): Unit = - if (isAvailable(out) && isTimerActive(timerName)) willStop = true - else completeStage() - - override def onPush(): Unit = { - val elem = grab(in) - val elementCostMiniTokens = Throttle.tokenToMiniToken(costCalculation(elem)) - - if (previousMiniTokens >= elementCostMiniTokens) { - previousMiniTokens -= elementCostMiniTokens - push(out, elem) - } else { - val currentNanos = System.nanoTime() - val currentMiniTokens = Math.min( - (currentNanos - previousNanos) * miniTokensPerNanos + previousMiniTokens, - maximumBurstMiniTokens) - - if (currentMiniTokens < elementCostMiniTokens) - mode match { - case Shaping ⇒ - currentElement = Some(elem) - val waitNanos = (elementCostMiniTokens - currentMiniTokens) / miniTokensPerNanos - previousNanos = currentNanos + waitNanos - scheduleOnce(timerName, waitNanos.nanos) - case Enforcing ⇒ failStage(new RateExceededException("Maximum throttle throughput exceeded")) - } - else { - previousMiniTokens = currentMiniTokens - elementCostMiniTokens - previousNanos = currentNanos - push(out, elem) - } - } - } - }) - - override protected def onTimer(key: Any): Unit = { - push(out, currentElement.get) - currentElement = None - previousMiniTokens = 0 - if (willStop) completeStage() + var currentElement: T = _ + val enforcing = mode match { + case Enforcing ⇒ true + case Shaping ⇒ false } - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - }) + override def preStart(): Unit = tokenBucket.init() - override def preStart(): Unit = previousNanos = System.nanoTime() + // This scope is here just to not retain an extra reference to the handler below. + // We can't put this code into preRestart() because setHandler() must be called before that. + { + val handler = new InHandler with OutHandler { + override def onUpstreamFinish(): Unit = + if (isAvailable(out) && isTimerActive(timerName)) willStop = true + else completeStage() + + override def onPush(): Unit = { + val elem = grab(in) + val cost = costCalculation(elem) + val delayNanos = tokenBucket.offer(cost) + + if (delayNanos == 0L) push(out, elem) + else { + if (enforcing) failStage(new RateExceededException("Maximum throttle throughput exceeded.")) + else { + currentElement = elem + scheduleOnce(timerName, delayNanos.nanos) + } + } + } + + override def onPull(): Unit = pull(in) + } + + setHandler(in, handler) + setHandler(out, handler) + // After this point, we no longer need the `handler` so it can just fall out of scope. + } + + override protected def onTimer(key: Any): Unit = { + push(out, currentElement) + currentElement = null.asInstanceOf[T] + if (willStop) completeStage() + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 027a866a59..510366e513 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1614,6 +1614,13 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9a3d4651df..ece94fbab4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1742,6 +1742,13 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 89cf7a1660..c0cde79c27 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1153,6 +1153,13 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing * cannot emit elements that cost more than the maximumBurst * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f77d572a3d..0a8fd75a60 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1479,6 +1479,13 @@ trait FlowOps[+Out, +Mat] { * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally * to their cost minus available tokens, meeting the target rate. * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing