+ str Add java.time.Duration to akka-stream's javadsl.* (#24706)
* + str Add java.time.Duration to javadsl.Source * + str Add java.time.Duration to javadsl.Flow * + str Add java.time.Duration to javadsl.BidiFlow * + str Add java.time.Duration to javadsl.RestartSource,RestartFlow and RestartSink * + str Add java.time.Duration to javadsl.StreamConverters * + str Add java.time.Duration to javadsl.SubFlow * + str Add java.time.Duration to javadsl.SubSource * !stream Deprecate methods which previously accepts Scala's FiniteDuration.
This commit is contained in:
parent
8245c55bc9
commit
e98c77e976
7 changed files with 1853 additions and 26 deletions
|
|
@ -7,13 +7,13 @@ package akka.stream.javadsl
|
|||
import akka.NotUsed
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.japi.function
|
||||
import akka.japi.Util
|
||||
import akka.stream._
|
||||
import akka.util.ConstantFun
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.japi.Util
|
||||
import java.util.Comparator
|
||||
import java.util.concurrent.CompletionStage
|
||||
|
||||
|
|
@ -597,9 +597,34 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
|
||||
* IllegalArgumentException is thrown.
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def groupedWithin(n: Int, d: FiniteDuration): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new SubSource(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* Chunk up this stream into groups of elements received within a time window,
|
||||
* or limited by the given number of elements, whatever happens first.
|
||||
* Empty groups will not be emitted if no elements are received from upstream.
|
||||
* The last group before end-of-stream will contain the buffered elements
|
||||
* since the previously emitted group.
|
||||
*
|
||||
* '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements
|
||||
*
|
||||
* '''Completes when''' upstream completes (emits last group)
|
||||
*
|
||||
* '''Cancels when''' downstream completes
|
||||
*
|
||||
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
|
||||
* IllegalArgumentException is thrown.
|
||||
*/
|
||||
def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
groupedWithin(n, d.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Chunk up this stream into groups of elements received within a time window,
|
||||
* or limited by the weight of the elements, whatever happens first.
|
||||
|
|
@ -618,9 +643,34 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
|
||||
* IllegalArgumentException is thrown.
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new SubSource(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava))
|
||||
|
||||
/**
|
||||
* Chunk up this stream into groups of elements received within a time window,
|
||||
* or limited by the weight of the elements, whatever happens first.
|
||||
* Empty groups will not be emitted if no elements are received from upstream.
|
||||
* The last group before end-of-stream will contain the buffered elements
|
||||
* since the previously emitted group.
|
||||
*
|
||||
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
|
||||
*
|
||||
* '''Completes when''' upstream completes (emits last group)
|
||||
*
|
||||
* '''Cancels when''' downstream completes
|
||||
*
|
||||
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
|
||||
* IllegalArgumentException is thrown.
|
||||
*/
|
||||
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
groupedWeightedWithin(maxWeight, costFn, d.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Discard the given number of elements at the beginning of the stream.
|
||||
* No elements will be dropped if `n` is zero or negative.
|
||||
|
|
@ -647,9 +697,27 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def dropWithin(d: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.dropWithin(d))
|
||||
|
||||
/**
|
||||
* Discard the elements received within the given duration at beginning of the stream.
|
||||
*
|
||||
* '''Emits when''' the specified time elapsed and a new upstream element arrives
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def dropWithin(d: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
dropWithin(d.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time,
|
||||
|
|
@ -733,9 +801,41 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
* @param of time to shift all messages
|
||||
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.delay(of, strategy))
|
||||
|
||||
/**
|
||||
* Shifts elements emission in time by a specified amount. It allows to store elements
|
||||
* in internal buffer while waiting for next element to be emitted. Depending on the defined
|
||||
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
|
||||
* there is no space available in the buffer.
|
||||
*
|
||||
* Delay precision is 10ms to avoid unnecessary timer scheduling cycles
|
||||
*
|
||||
* Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
|
||||
*
|
||||
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
|
||||
* * EmitEarly - strategy do not wait to emit element if buffer is full
|
||||
*
|
||||
* '''Backpressures when''' depending on OverflowStrategy
|
||||
* * Backpressure - backpressures when buffer is full
|
||||
* * DropHead, DropTail, DropBuffer - never backpressures
|
||||
* * Fail - fails the stream if buffer gets full
|
||||
*
|
||||
* '''Completes when''' upstream completes and buffered elements has been drained
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @param of time to shift all messages
|
||||
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
|
||||
*/
|
||||
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
delay(of.asScala, strategy)
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover allows to send last element on failure and gracefully complete the stream
|
||||
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||
|
|
@ -858,9 +958,33 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels or timer fires
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def takeWithin(d: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.takeWithin(d))
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after the given
|
||||
* duration. Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
*
|
||||
* Note that this can be combined with [[#take]] to limit the number of elements
|
||||
* within the duration.
|
||||
*
|
||||
* '''Emits when''' an upstream element arrives
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes or timer fires
|
||||
*
|
||||
* '''Cancels when''' downstream cancels or timer fires
|
||||
*/
|
||||
def takeWithin(d: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
takeWithin(d.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
|
||||
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
|
||||
|
|
@ -1317,9 +1441,28 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def initialTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.initialTimeout(timeout))
|
||||
|
||||
/**
|
||||
* If the first element has not passed through this stage before the provided timeout, the stream is failed
|
||||
* with a [[java.util.concurrent.TimeoutException]].
|
||||
*
|
||||
* '''Emits when''' upstream emits an element
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
initialTimeout(timeout.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* If the completion of the stream does not happen until the provided timeout, the stream is failed
|
||||
* with a [[java.util.concurrent.TimeoutException]].
|
||||
|
|
@ -1332,9 +1475,28 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def completionTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.completionTimeout(timeout))
|
||||
|
||||
/**
|
||||
* If the completion of the stream does not happen until the provided timeout, the stream is failed
|
||||
* with a [[java.util.concurrent.TimeoutException]].
|
||||
*
|
||||
* '''Emits when''' upstream emits an element
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
completionTimeout(timeout.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* If the time between two processed elements exceeds the provided timeout, the stream is failed
|
||||
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
|
||||
|
|
@ -1348,9 +1510,29 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def idleTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.idleTimeout(timeout))
|
||||
|
||||
/**
|
||||
* If the time between two processed elements exceeds the provided timeout, the stream is failed
|
||||
* with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
|
||||
* so the resolution of the check is one period (equals to timeout value).
|
||||
*
|
||||
* '''Emits when''' upstream emits an element
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
idleTimeout(timeout.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
|
||||
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
|
||||
|
|
@ -1364,9 +1546,29 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def backpressureTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.backpressureTimeout(timeout))
|
||||
|
||||
/**
|
||||
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
|
||||
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
|
||||
* so the resolution of the check is one period (equals to timeout value).
|
||||
*
|
||||
* '''Emits when''' upstream emits an element
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
backpressureTimeout(timeout.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
|
||||
* stage attempts to maintains a base rate of emitted elements towards the downstream.
|
||||
|
|
@ -1384,9 +1586,33 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create()))
|
||||
|
||||
/**
|
||||
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
|
||||
* stage attempts to maintains a base rate of emitted elements towards the downstream.
|
||||
*
|
||||
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
|
||||
* do not accumulate during this period.
|
||||
*
|
||||
* Upstream elements are always preferred over injected elements.
|
||||
*
|
||||
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
keepAlive(maxIdle.asScala, injectedElem)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
|
||||
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
|
||||
|
|
@ -1423,10 +1649,54 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* @see [[#throttleEven]]
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
|
||||
mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
|
||||
new SubSource(delegate.throttle(elements, per, maximumBurst, mode))
|
||||
|
||||
/**
|
||||
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
|
||||
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
|
||||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element costs If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
|
||||
*
|
||||
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
|
||||
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
|
||||
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
|
||||
*
|
||||
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
|
||||
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
|
||||
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
|
||||
* events being evenly spread with some small interval (30 milliseconds or less).
|
||||
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
|
||||
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
|
||||
*
|
||||
* '''Emits when''' upstream emits an element and configured time per each element elapsed
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @see [[#throttleEven]]
|
||||
*/
|
||||
def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int,
|
||||
mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
throttle(elements, per.asScala, maximumBurst, mode)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends elements downstream with speed limited to `cost/per`. Cost is
|
||||
* calculating for each element individually by calling `calculateCost` function.
|
||||
|
|
@ -1466,10 +1736,57 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* @see [[#throttleEven]]
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
|
||||
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
|
||||
new SubSource(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode))
|
||||
|
||||
/**
|
||||
* Sends elements downstream with speed limited to `cost/per`. Cost is
|
||||
* calculating for each element individually by calling `calculateCost` function.
|
||||
* This combinator works for streams when elements have different cost(length).
|
||||
* Streams of `ByteString` for example.
|
||||
*
|
||||
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
|
||||
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
|
||||
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
|
||||
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
|
||||
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
|
||||
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started.
|
||||
*
|
||||
* Parameter `mode` manages behavior when upstream is faster than throttle rate:
|
||||
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
|
||||
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
|
||||
* cannot emit elements that cost more than the maximumBurst
|
||||
*
|
||||
* It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing
|
||||
* the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce
|
||||
* most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
|
||||
*
|
||||
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
|
||||
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
|
||||
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
|
||||
* events being evenly spread with some small interval (30 milliseconds or less).
|
||||
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
|
||||
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
|
||||
*
|
||||
* '''Emits when''' upstream emits an element and configured time per each element elapsed
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @see [[#throttleEven]]
|
||||
*/
|
||||
def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int,
|
||||
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
throttle(cost, per.asScala, maximumBurst, costCalculation, mode)
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a simplified version of throttle that spreads events evenly across the given time interval.
|
||||
*
|
||||
|
|
@ -1480,6 +1797,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
* [[throttle()]] with maximumBurst attribute.
|
||||
* @see [[#throttle]]
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
|
||||
new SubSource(delegate.throttle(elements, per, Int.MaxValue, mode))
|
||||
|
||||
|
|
@ -1493,10 +1812,43 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
* [[throttle()]] with maximumBurst attribute.
|
||||
* @see [[#throttle]]
|
||||
*/
|
||||
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
throttleEven(elements, per.asScala, mode)
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a simplified version of throttle that spreads events evenly across the given time interval.
|
||||
*
|
||||
* Use this combinator when you need just slow down a stream without worrying about exact amount
|
||||
* of time between events.
|
||||
*
|
||||
* If you want to be sure that no time interval has no more than specified number of events you need to use
|
||||
* [[throttle()]] with maximumBurst attribute.
|
||||
* @see [[#throttle]]
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def throttleEven(cost: Int, per: FiniteDuration,
|
||||
costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
|
||||
new SubSource(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode))
|
||||
|
||||
/**
|
||||
* This is a simplified version of throttle that spreads events evenly across the given time interval.
|
||||
*
|
||||
* Use this combinator when you need just slow down a stream without worrying about exact amount
|
||||
* of time between events.
|
||||
*
|
||||
* If you want to be sure that no time interval has no more than specified number of events you need to use
|
||||
* [[throttle()]] with maximumBurst attribute.
|
||||
* @see [[#throttle]]
|
||||
*/
|
||||
def throttleEven(cost: Int, per: java.time.Duration,
|
||||
costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
throttleEven(cost, per.asScala, costCalculation, mode)
|
||||
}
|
||||
|
||||
/**
|
||||
* Detaches upstream demand from downstream demand without detaching the
|
||||
* stream rates; in other words acts like a buffer of size 1.
|
||||
|
|
@ -1522,9 +1874,27 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
@Deprecated
|
||||
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
|
||||
def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.initialDelay(delay))
|
||||
|
||||
/**
|
||||
* Delays the initial element by the specified duration.
|
||||
*
|
||||
* '''Emits when''' upstream emits an element if the initial delay is already elapsed
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] = {
|
||||
import akka.util.JavaDurationConverters._
|
||||
initialDelay(delay.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the attributes of this [[Source]] to the given ones and seal the list
|
||||
* of attributes. This means that further calls will not be able to remove these
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue