From f7d23b508bef3173a380c9d7fff9d94147977093 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sat, 23 Jan 2016 17:55:03 -0500 Subject: [PATCH] =str #18890 fix throttle --- .../scala/akka/stream/impl/ConstantFun.scala | 2 + .../scala/akka/stream/impl/Throttle.scala | 44 +++++++++++-------- .../main/scala/akka/stream/javadsl/Flow.scala | 2 +- .../scala/akka/stream/javadsl/Source.scala | 2 +- .../scala/akka/stream/javadsl/SubFlow.scala | 2 +- .../scala/akka/stream/javadsl/SubSource.scala | 2 +- .../scala/akka/stream/scaladsl/Flow.scala | 4 +- 7 files changed, 33 insertions(+), 25 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index cd152ba336..f6c14a4227 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -24,4 +24,6 @@ private[akka] object ConstantFun { val zeroLong = (_: Any) ⇒ 0L val oneLong = (_: Any) ⇒ 1L + + val oneInt = (_: Any) ⇒ 1 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index c259dafff7..230926e79a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -9,7 +9,6 @@ import akka.stream.stage._ import akka.stream._ import scala.concurrent.duration.{ FiniteDuration, _ } -import scala.util.control.NonFatal /** * INTERNAL API @@ -26,10 +25,10 @@ private[stream] class Throttle[T](cost: Int, override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { var willStop = false - var lastTokens: Long = 0 - var previousTime: Long = 0 + var lastTokens: Long = maximumBurst + var previousTime: Long = now() - val speed = ((cost.toDouble / per.toMillis) * 1024 * 1024).toLong + val speed = ((cost.toDouble / per.toNanos) * 1073741824).toLong val timerName: String = "ThrottleTimer" var currentElement: Option[T] = None @@ -42,21 +41,29 @@ private[stream] class Throttle[T](cost: Int, else completeStage() override def onPush(): Unit = { - val timeElapsed = now() - previousTime - val currentTokens = Math.min(timeElapsed * speed + lastTokens, scaledMaximumBurst) val elem = grab(in) val elementCost = scale(costCalculation(elem)) - if (currentTokens < elementCost) - mode match { - case Shaping ⇒ - currentElement = Some(elem) - scheduleOnce(timerName, ((elementCost - currentTokens) / speed).millis) - case Enforcing ⇒ failStage(new RateExceededException("Maximum throttle throughput exceeded")) - } - else { - lastTokens = currentTokens - elementCost - previousTime = now() + + if (lastTokens >= elementCost) { + lastTokens -= elementCost push(out, elem) + } else { + val currentTime = now() + val currentTokens = Math.min((currentTime - previousTime) * speed + lastTokens, scaledMaximumBurst) + if (currentTokens < elementCost) + mode match { + case Shaping ⇒ + currentElement = Some(elem) + val waitTime = (elementCost - currentTokens) / speed + previousTime = currentTime + waitTime + scheduleOnce(timerName, waitTime.nanos) + case Enforcing ⇒ failStage(new RateExceededException("Maximum throttle throughput exceeded")) + } + else { + lastTokens = currentTokens - elementCost + previousTime = currentTime + push(out, elem) + } } } }) @@ -64,7 +71,6 @@ private[stream] class Throttle[T](cost: Int, override protected def onTimer(key: Any): Unit = { push(out, currentElement.get) currentElement = None - previousTime = now() lastTokens = 0 if (willStop) completeStage() } @@ -75,9 +81,9 @@ private[stream] class Throttle[T](cost: Int, override def preStart(): Unit = previousTime = now() - private def now(): Long = System.currentTimeMillis() + private def now(): Long = System.nanoTime() - private def scale(e: Int): Long = e.toLong << 20 + private def scale(e: Int): Long = e.toLong << 30 } override def toString = "Throttle" diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 187bcc1abf..1505ecd57d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1569,7 +1569,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstyness. Whenever stream wants to send an element, it takes as many * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. + * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1280bef25d..96fc23335a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1709,7 +1709,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstyness. Whenever stream wants to send an element, it takes as many * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. + * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 7b85c1e82a..3d9236b10b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1119,7 +1119,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstyness. Whenever stream wants to send an element, it takes as many * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. + * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 85bad32c8f..ae682fcb45 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1117,7 +1117,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstyness. Whenever stream wants to send an element, it takes as many * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. + * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2657d267d0..65a59b8139 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1432,7 +1432,7 @@ trait FlowOps[+Out, +Mat] { * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstyness. Whenever stream wants to send an element, it takes as many * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. + * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate @@ -1448,7 +1448,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] = - throttle(elements, per, maximumBurst, _ ⇒ 1, mode) + throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode) /** * Sends elements downstream with speed limited to `cost/per`. Cost is