Add throttle combinator without bucket size in parameters (#23808)

* +str Add throttle combinator without bucket size in parameters

* Add mima filter plus comment formatting

* Reviwed throttle messages across all classes

* move mima filter to 2.4.6

* change to throttleEven, plus grammar fixes

* fix formatting

* changed mima filter according to new API

* fixes accoring to feedback
This commit is contained in:
Alexander Golubev 2017-11-17 06:20:12 -05:00 committed by Patrik Nordwall
parent b51d720b18
commit a7b1a6675b
8 changed files with 282 additions and 37 deletions

View file

@ -29,7 +29,7 @@ class FlowThrottleSpec extends StreamSpec {
}
"accept very high rates" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 1.nanos, 0, ThrottleMode.Shaping)
Source(1 to 5).throttle(1, 1.nanos, 0, Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
@ -37,7 +37,7 @@ class FlowThrottleSpec extends StreamSpec {
}
"accept very low rates" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.days, 1, ThrottleMode.Shaping)
Source(1 to 5).throttle(1, 100.days, 1, Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1)

View file

@ -1,6 +1,9 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this")
# #23808 adding simplified throttle API
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttleEven")
# #23111 AsyncCallbacks to just-finishing stages can be lost
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Offer")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.QueueSource$Completion$")

View file

@ -71,8 +71,7 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
override def onPull(): Unit = pull(in)
}
setHandler(in, handler)
setHandler(out, handler)
setHandlers(in, out, handler)
// After this point, we no longer need the `handler` so it can just fall out of scope.
}

View file

@ -2049,8 +2049,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
@ -2060,7 +2061,11 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
@ -2070,6 +2075,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
@ -2084,15 +2091,26 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -2100,11 +2118,40 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode))
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.

View file

@ -2107,8 +2107,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
@ -2118,7 +2119,11 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
@ -2128,6 +2133,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.Source[Out, Mat] =
@ -2142,15 +2149,26 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -2158,11 +2176,40 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per, Int.MaxValue, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode))
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.

View file

@ -1344,13 +1344,25 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -1358,6 +1370,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
@ -1372,9 +1386,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
@ -1385,7 +1399,11 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
@ -1395,11 +1413,40 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode))
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.

View file

@ -1337,13 +1337,25 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
* tokens from the bucket as element costs If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -1351,6 +1363,8 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
@ -1365,15 +1379,26 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -1381,11 +1406,40 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per, Int.MaxValue, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode))
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.

View file

@ -1787,14 +1787,26 @@ trait FlowOps[+Out, +Mat] {
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -1802,6 +1814,8 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] =
throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode)
@ -1815,22 +1829,26 @@ trait FlowOps[+Out, +Mat] {
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
@ -1838,11 +1856,41 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: (Out) Int, mode: ThrottleMode): Repr[Out] =
via(new Throttle(cost, per, maximumBurst, costCalculation, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval. throttleEven using
* best effort approach to meet throttle rate.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): Repr[Out] =
throttle(elements, per, Int.MaxValue, ConstantFun.oneInt, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): Repr[Out] =
via(new Throttle(cost, per, Int.MaxValue, costCalculation, mode))
/**
* Detaches upstream demand from downstream demand without detaching the
* stream rates; in other words acts like a buffer of size 1.