diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala index 7c30112b2a..1bd36e898f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -29,7 +29,7 @@ class FlowThrottleSpec extends StreamSpec { } "accept very high rates" in Utils.assertAllStagesStopped { - Source(1 to 5).throttle(1, 1.nanos, 0, ThrottleMode.Shaping) + Source(1 to 5).throttle(1, 1.nanos, 0, Shaping) .runWith(TestSink.probe[Int]) .request(5) .expectNext(1, 2, 3, 4, 5) @@ -37,7 +37,7 @@ class FlowThrottleSpec extends StreamSpec { } "accept very low rates" in Utils.assertAllStagesStopped { - Source(1 to 5).throttle(1, 100.days, 1, ThrottleMode.Shaping) + Source(1 to 5).throttle(1, 100.days, 1, Shaping) .runWith(TestSink.probe[Int]) .request(5) .expectNext(1) diff --git a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes index fe984a3f4a..5d2732768e 100644 --- a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes @@ -1,6 +1,9 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this") +# #23808 adding simplified throttle API +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttleEven") + # #23111 AsyncCallbacks to just-finishing stages can be lost ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Offer") ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Completion$") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index ee7bfb1c7c..8e3daefc87 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -71,8 +71,7 @@ import scala.concurrent.duration.{ FiniteDuration, _ } override def onPull(): Unit = pull(in) } - setHandler(in, handler) - setHandler(out, handler) + setHandlers(in, out, handler) // After this point, we no longer need the `handler` so it can just fall out of scope. } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index cc4d1365a4..f80d5afd95 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2049,8 +2049,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate @@ -2060,8 +2061,12 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). * - * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2070,6 +2075,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = @@ -2084,15 +2091,26 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * tokens from the bucket as element costs. If there isn't any, throttle waits until the * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally - * to their cost minus available tokens, meeting the target rate. + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing * cannot emit elements that cost more than the maximumBurst * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -2100,11 +2118,40 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode)) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: FiniteDuration, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode)) + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 68a933d00f..607b2f1031 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -2107,8 +2107,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate @@ -2118,8 +2119,12 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). * - * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2128,6 +2133,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = @@ -2142,15 +2149,26 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * tokens from the bucket as element costs. If there isn't any, throttle waits until the * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally - * to their cost minus available tokens, meeting the target rate. + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing * cannot emit elements that cost more than the maximumBurst * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -2158,11 +2176,40 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(elements, per, Int.MaxValue, mode)) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: FiniteDuration, + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode)) + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index c4dac6041b..9cc47dd0f7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1344,13 +1344,25 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -1358,6 +1370,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = @@ -1372,9 +1386,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * tokens from the bucket as element costs. If there isn't any, throttle waits until the * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally - * to their cost minus available tokens, meeting the target rate. + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate @@ -1385,8 +1399,12 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). * - * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1395,11 +1413,40 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode)) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: FiniteDuration, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode)) + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index fd9ebae68e..ae62ecb8b9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1337,13 +1337,25 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. + * tokens from the bucket as element costs If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -1351,6 +1363,8 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = @@ -1365,15 +1379,26 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * tokens from the bucket as element costs. If there isn't any, throttle waits until the * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally - * to their cost minus available tokens, meeting the target rate. + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing * cannot emit elements that cost more than the maximumBurst * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -1381,11 +1406,40 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = new SubSource(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.throttle(elements, per, Int.MaxValue, mode)) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: FiniteDuration, + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode)) + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d1b678e218..7c549b598d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1787,14 +1787,26 @@ trait FlowOps[+Out, +Mat] { * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as number of elements. If there isn't any, throttle waits until the - * bucket accumulates enough tokens. Bucket is full when stream just materialized and started. + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing * cannot emit elements that cost more than the maximumBurst * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -1802,6 +1814,8 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] = throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode) @@ -1815,22 +1829,26 @@ trait FlowOps[+Out, +Mat] { * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity * to allow some burstiness. Whenever stream wants to send an element, it takes as many - * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * tokens from the bucket as element costs. If there isn't any, throttle waits until the * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally - * to their cost minus available tokens, meeting the target rate. - * - * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing - * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce - * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). - * - * Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. * * Parameter `mode` manages behaviour when upstream is faster than throttle rate: * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing * cannot emit elements that cost more than the maximumBurst * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -1838,11 +1856,41 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out] = via(new Throttle(cost, per, maximumBurst, costCalculation, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. throttleEven using + * best effort approach to meet throttle rate. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): Repr[Out] = + throttle(elements, per, Int.MaxValue, ConstantFun.oneInt, mode) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: FiniteDuration, + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out] = + via(new Throttle(cost, per, Int.MaxValue, costCalculation, mode)) + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1.