diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index c442f125f5..a35c82d7ae 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -92,8 +92,24 @@ object BidiFlow { * every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers * the *joint* frequencies of the elements in both directions. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, NotUsed] = new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout)) + + /** + * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * There is a difference between this stage and having two idleTimeout Flows assembled into a BidiStage. + * If the timeout is configured to be 1 seconds, then this stage will not fail even though there are elements flowing + * every second in one direction, but no elements are flowing in the other direction. I.e. this stage considers + * the *joint* frequencies of the elements in both directions. + */ + def bidirectionalIdleTimeout[I, O](timeout: java.time.Duration): BidiFlow[I, I, O, O, NotUsed] = { + import akka.util.JavaDurationConverters._ + bidirectionalIdleTimeout(timeout.asScala) + } } final class BidiFlow[I1, O1, I2, O2, Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f13df70c97..6f5c2995ce 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1018,9 +1018,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered + * + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `n` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWithin(n: Int, d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWithin(n, d.asScala) + } + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the weight of the elements, whatever happens first. @@ -1039,9 +1064,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWeightedWithin(maxWeight, costFn, d.asScala) + } + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined @@ -1067,9 +1117,41 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = new Flow(delegate.delay(of, strategy)) + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements have been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + delay(of.asScala, strategy) + } + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -1096,9 +1178,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def dropWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.dropWithin(d)) + /** + * Discard the elements received within the given duration at beginning of the stream. + * + * '''Emits when''' the specified time elapsed and a new upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def dropWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + dropWithin(d.asScala) + } + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time, including the first failed element iff inclusive is true @@ -1292,9 +1392,35 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * See also [[Flow.limit]], [[Flow.limitWeighted]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def takeWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + * + * '''Emits when''' an upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or timer fires + * + * '''Cancels when''' downstream cancels or timer fires + * + * See also [[Flow.limit]], [[Flow.limitWeighted]] + */ + def takeWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + takeWithin(d.asScala) + } + /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the @@ -2171,9 +2297,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.initialTimeout(timeout)) + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels + */ + def initialTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialTimeout(timeout.asScala) + } + /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. @@ -2186,9 +2331,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.completionTimeout(timeout)) + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels + */ + def completionTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + completionTimeout(timeout.asScala) + } + /** * If the time between two processed elements exceeds the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -2202,9 +2366,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.idleTimeout(timeout)) + /** + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels + */ + def idleTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + idleTimeout(timeout.asScala) + } + /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -2218,9 +2402,29 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def backpressureTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.backpressureTimeout(timeout)) + /** + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + backpressureTimeout(timeout.asScala) + } + /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. @@ -2238,9 +2442,33 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + keepAlive(maxIdle.asScala, injectedElem) + } + /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate * for emitting messages. This combinator works for streams where all elements have the same cost or length. @@ -2277,10 +2505,54 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(elements, per, maximumBurst, mode)) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, + mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(elements, per.asScala, maximumBurst, mode) + } + /** * Sends elements downstream with speed limited to `cost/per`. Cost is * calculating for each element individually by calling `calculateCost` function. @@ -2320,10 +2592,57 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(cost, per.asScala, maximumBurst, costCalculation, mode) + } + /** * This is a simplified version of throttle that spreads events evenly across the given time interval. * @@ -2334,6 +2653,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode)) @@ -2347,10 +2668,43 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(elements, per.asScala, mode) + } + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: java.time.Duration, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(cost, per.asScala, costCalculation, mode) + } + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. @@ -2394,9 +2748,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.initialDelay(delay)) + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay is already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: java.time.Duration): javadsl.Flow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialDelay(delay.asScala) + } + /** * Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala index 98a82b0542..be019ae46c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala @@ -38,6 +38,8 @@ object RestartSource { * In order to skip this additional delay pass in `0`. * @param sourceFactory A factory for producing the [[Source]] to wrap. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ @@ -45,6 +47,64 @@ object RestartSource { }.asJava } + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] + * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sourceFactory.create().asScala + }.asJava + } + /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential * backoff. @@ -68,9 +128,37 @@ object RestartSource { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param sourceFactory A factory for producing the [[Source]] to wrap. */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will never emit a failure, since the failure of the wrapped [[Source]] is always handled by + * restarting. The wrapped [[Source]] can be cancelled by cancelling this [[Source]]. + * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. + * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right + * after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ sourceFactory.create().asScala }.asJava } @@ -95,9 +183,39 @@ object RestartSource { * @param sourceFactory A factory for producing the [[Source]] to wrap. * */ - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ + import akka.util.JavaDurationConverters._ + onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sourceFactory) + } + + /** + * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails using an exponential backoff. + * + * This [[Source]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion + * or failure of the wrapped [[Source]] is handled by restarting it. The wrapped [[Source]] can however be cancelled + * by cancelling this [[Source]]. When that happens, the wrapped [[Source]], if currently running will be cancelled, + * and it will not be restarted. This can be triggered simply by the downstream cancelling, or externally by + * introducing a [[KillSwitch]] right after this [[Source]] in the graph. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sourceFactory A factory for producing the [[Source]] to wrap. + * + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { + akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ sourceFactory.create().asScala }.asJava } @@ -124,11 +242,10 @@ object RestartSource { * @param sourceFactory A factory for producing the [[Source]] to wrap. * */ - def onFailuresWithBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + def onFailuresWithBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, maxRestarts: Int, sourceFactory: Creator[Source[T, _]]): Source[T, NotUsed] = { - akka.stream.scaladsl.RestartSource.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - sourceFactory.create().asScala - }.asJava + import akka.util.JavaDurationConverters._ + onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sourceFactory) } } @@ -165,6 +282,8 @@ object RestartSink { * In order to skip this additional delay pass in `0`. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ @@ -172,6 +291,71 @@ object RestartSink { }.asJava } + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. + * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that + * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered + * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the + * graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, sinkFactory) + } + + /** + * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Sink]] will not cancel as long as maxRestarts is not reached, since cancellation by the wrapped [[Sink]] + * is handled by restarting it. The wrapped [[Sink]] can however be completed by feeding a completion or error into + * this [[Sink]]. When that happens, the [[Sink]], if currently running, will terminate and will not be restarted. + * This can be triggered simply by the upstream completing, or externally by introducing a [[KillSwitch]] right + * before this [[Sink]] in the graph. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already + * sent may have been lost. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param sinkFactory A factory for producing the [[Sink]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { + akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + sinkFactory.create().asScala + }.asJava + } + /** * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential * backoff. @@ -198,11 +382,10 @@ object RestartSink { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ - def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + def withBackoff[T](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, maxRestarts: Int, sinkFactory: Creator[Sink[T, _]]): Sink[T, NotUsed] = { - akka.stream.scaladsl.RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - sinkFactory.create().asScala - }.asJava + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, sinkFactory) } } @@ -238,6 +421,8 @@ object RestartFlow { * In order to skip this additional delay pass in `0`. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor) { () ⇒ @@ -245,6 +430,69 @@ object RestartFlow { }.asJava } + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination + * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] + * will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, + flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, flowFactory) + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential + * backoff. + * + * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + * completed. Any termination by the [[Flow]] before that time will be handled by restarting it as long as maxRestarts + * is not reached. Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's + * running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + flowFactory.create().asScala + }.asJava + } + /** * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential * backoff. @@ -270,9 +518,42 @@ object RestartFlow { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ - def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + def withBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - akka.stream.scaladsl.RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ + import akka.util.JavaDurationConverters._ + withBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) + } + + /** + * Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts + * using an exponential backoff. + * + * This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that + * time will be handled by restarting it as long as maxRestarts is not reached. + * However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate + * the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted. + * + * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, + * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. + * + * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff. + * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + * @param flowFactory A factory for producing the [[Flow]] to wrap. + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { + akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ flowFactory.create().asScala }.asJava } @@ -302,10 +583,9 @@ object RestartFlow { * Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ - def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, + def onFailuresWithBackoff[In, Out](minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double, maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = { - akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () ⇒ - flowFactory.create().asScala - }.asJava + import akka.util.JavaDurationConverters._ + onFailuresWithBackoff(minBackoff.asScala, maxBackoff.asScala, randomFactor, maxRestarts, flowFactory) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2241088d7d..f49a71e4f0 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -7,14 +7,13 @@ package akka.stream.javadsl import java.util import java.util.Optional -import akka.util.{ ConstantFun, Timeout } -import akka.util.JavaDurationConverters._ -import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream._ import akka.stream.impl.{ LinearTraversalBuilder, SourceQueueAdapter } +import akka.util.{ ConstantFun, Timeout } +import akka.{ Done, NotUsed } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance @@ -205,14 +204,22 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def tick[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: O): javadsl.Source[O, Cancellable] = new Source(scaladsl.Source.tick(initialDelay, interval, tick)) /** - * Same as [[tick]], but accepts Java [[java.time.Duration]] instead of Scala ones. + * Elements are emitted periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. */ - def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] = + def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] = { + import akka.util.JavaDurationConverters._ Source.tick(initialDelay.asScala, interval.asScala, tick) + } /** * Create a `Source` with one element. @@ -1700,9 +1707,34 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered + * + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `n` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWithin(n: Int, d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWithin(n, d.asScala) + } + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the weight of the elements, whatever happens first. @@ -1721,9 +1753,34 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWeightedWithin(maxWeight, costFn, d.asScala) + } + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined @@ -1749,9 +1806,41 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): Source[Out, Mat] = new Source(delegate.delay(of, strategy)) + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + delay(of.asScala, strategy) + } + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -1778,9 +1867,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def dropWithin(d: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.dropWithin(d)) + /** + * Discard the elements received within the given duration at beginning of the stream. + * + * '''Emits when''' the specified time elapsed and a new upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + dropWithin(d.asScala) + } + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time. Due to input buffering some elements may have been @@ -1855,9 +1962,33 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels or timer fires */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def takeWithin(d: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.takeWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + * + * '''Emits when''' an upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or timer fires + * + * '''Cancels when''' downstream cancels or timer fires + */ + def takeWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + takeWithin(d.asScala) + } + /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the @@ -2235,9 +2366,28 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.initialTimeout(timeout)) + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels + */ + def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialTimeout(timeout.asScala) + } + /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. @@ -2250,9 +2400,28 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.completionTimeout(timeout)) + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels + */ + def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + completionTimeout(timeout.asScala) + } + /** * If the time between two processed elements exceeds the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -2266,9 +2435,29 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.idleTimeout(timeout)) + /** + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels + */ + def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + idleTimeout(timeout.asScala) + } + /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -2282,9 +2471,29 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def backpressureTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.backpressureTimeout(timeout)) + /** + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + backpressureTimeout(timeout.asScala) + } + /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. @@ -2302,9 +2511,33 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = new Source(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + keepAlive(maxIdle.asScala, injectedElem) + } + /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate * for emitting messages. This combinator works for streams where all elements have the same cost or length. @@ -2341,10 +2574,54 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = new Source(delegate.throttle(elements, per, maximumBurst, mode)) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, + mode: ThrottleMode): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(elements, per.asScala, maximumBurst, mode) + } + /** * Sends elements downstream with speed limited to `cost/per`. Cost is * calculating for each element individually by calling `calculateCost` function. @@ -2384,10 +2661,57 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(cost, per.asScala, maximumBurst, costCalculation, mode) + } + /** * This is a simplified version of throttle that spreads events evenly across the given time interval. * @@ -2398,6 +2722,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Source[Out, Mat] = new Source(delegate.throttle(elements, per, Int.MaxValue, mode)) @@ -2411,10 +2737,43 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(elements, per.asScala, mode) + } + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = new Source(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: java.time.Duration, + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(cost, per.asScala, costCalculation, mode) + } + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. @@ -2458,9 +2817,27 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.initialDelay(delay)) + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay is already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialDelay(delay.asScala) + } + /** * Replace the attributes of this [[Source]] with the given ones. If this Source is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index b8710e91d4..08def1fd98 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -58,7 +58,7 @@ object StreamConverters { * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible * to read the values produced by the stream this Sink is attached to. * - * This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] to explicitly + * This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] or [[#inputStream(java.time.Duration)]] to explicitly * configure the timeout. * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. @@ -85,9 +85,30 @@ object StreamConverters { * * @param readTimeout the max time the read operation on the materialized InputStream should block */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def asInputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] = new Sink(scaladsl.StreamConverters.asInputStream(readTimeout)) + /** + * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[akka.stream.ActorAttributes]]. + * + * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and + * closing the [[InputStream]] will cancel this [[Sink]]. + * + * @param readTimeout the max time the read operation on the materialized InputStream should block + */ + def asInputStream(readTimeout: java.time.Duration): Sink[ByteString, InputStream] = { + import akka.util.JavaDurationConverters._ + asInputStream(readTimeout.asScala) + } + /** * Creates a Source from an [[java.io.InputStream]] created by the given function. * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements. @@ -136,13 +157,34 @@ object StreamConverters { * * @param writeTimeout the max time the write operation on the materialized OutputStream should block */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def asOutputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = new Source(scaladsl.StreamConverters.asOutputStream(writeTimeout)) + /** + * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or + * set it for a given Source by using [[akka.stream.ActorAttributes]]. + * + * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] + * will complete this [[Source]]. + * + * @param writeTimeout the max time the write operation on the materialized OutputStream should block + */ + def asOutputStream(writeTimeout: java.time.Duration): javadsl.Source[ByteString, OutputStream] = { + import akka.util.JavaDurationConverters._ + asOutputStream(writeTimeout.asScala) + } + /** * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible * to write the ByteStrings to the stream this Source is attached to. The write timeout for OutputStreams - * materialized will default to 5 seconds, @see [[#outputStream(FiniteDuration)]] if you want to override it. + * materialized will default to 5 seconds, @see [[#outputStream(FiniteDuration)]] or [[#outputStream(java.time.Duration)]] if you want to override it. * * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 039da712c9..a35de25c93 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -606,9 +606,34 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWithin(n: Int, d: FiniteDuration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered + * + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `n` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWithin(n: Int, d: java.time.Duration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWithin(n, d.asScala) + } + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the weight of the elements, whatever happens first. @@ -627,9 +652,34 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWeightedWithin(maxWeight, costFn, d.asScala) + } + /** * Shifts elements emission in time by a specified amount. It allows to store elements * in internal buffer while waiting for next element to be emitted. Depending on the defined @@ -655,9 +705,41 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = new SubFlow(delegate.delay(of, strategy)) + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + delay(of.asScala, strategy) + } + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -684,9 +766,27 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def dropWithin(d: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.dropWithin(d)) + /** + * Discard the elements received within the given duration at beginning of the stream. + * + * '''Emits when''' the specified time elapsed and a new upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def dropWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + dropWithin(d.asScala) + } + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time, @@ -873,9 +973,33 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels or timer fires */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def takeWithin(d: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.takeWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + * + * '''Emits when''' an upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or timer fires + * + * '''Cancels when''' downstream cancels or timer fires + */ + def takeWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + takeWithin(d.asScala) + } + /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the @@ -1332,9 +1456,28 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialTimeout(timeout: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.initialTimeout(timeout)) + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels + */ + def initialTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialTimeout(timeout.asScala) + } + /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. @@ -1347,9 +1490,28 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def completionTimeout(timeout: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.completionTimeout(timeout)) + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels + */ + def completionTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + completionTimeout(timeout.asScala) + } + /** * If the time between two processed elements exceeds the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -1363,9 +1525,29 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def idleTimeout(timeout: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.idleTimeout(timeout)) + /** + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels + */ + def idleTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + idleTimeout(timeout.asScala) + } + /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -1379,9 +1561,29 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def backpressureTimeout(timeout: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.backpressureTimeout(timeout)) + /** + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + backpressureTimeout(timeout.asScala) + } + /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. @@ -1399,9 +1601,33 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + keepAlive(maxIdle.asScala, injectedElem) + } + /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate * for emitting messages. This combinator works for streams where all elements have the same cost or length. @@ -1438,10 +1664,54 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.throttle(elements, per, maximumBurst, mode)) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, + mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(elements, per.asScala, maximumBurst, mode) + } + /** * Sends elements downstream with speed limited to `cost/per`. Cost is * calculating for each element individually by calling `calculateCost` function. @@ -1481,10 +1751,57 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(cost, per.asScala, maximumBurst, costCalculation, mode) + } + /** * This is a simplified version of throttle that spreads events evenly across the given time interval. * @@ -1495,6 +1812,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode)) @@ -1508,10 +1827,43 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(elements, per.asScala, mode) + } + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: java.time.Duration, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(cost, per.asScala, costCalculation, mode) + } + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. @@ -1537,9 +1889,27 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialDelay(delay: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.initialDelay(delay)) + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay is already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: java.time.Duration): SubFlow[In, Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialDelay(delay.asScala) + } + /** * Change the attributes of this [[Source]] to the given ones and seal the list * of attributes. This means that further calls will not be able to remove these diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 478573b92a..970a9626a6 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -7,13 +7,13 @@ package akka.stream.javadsl import akka.NotUsed import akka.event.LoggingAdapter import akka.japi.function +import akka.japi.Util import akka.stream._ import akka.util.ConstantFun import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration -import akka.japi.Util import java.util.Comparator import java.util.concurrent.CompletionStage @@ -597,9 +597,34 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWithin(n: Int, d: FiniteDuration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the given number of elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or `n` elements is buffered + * + * '''Backpressures when''' downstream backpressures, and there are `n+1` buffered elements + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `n` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWithin(n, d.asScala) + } + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the weight of the elements, whatever happens first. @@ -618,9 +643,34 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) + /** + * Chunk up this stream into groups of elements received within a time window, + * or limited by the weight of the elements, whatever happens first. + * Empty groups will not be emitted if no elements are received from upstream. + * The last group before end-of-stream will contain the buffered elements + * since the previously emitted group. + * + * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached + * + * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` + * + * '''Completes when''' upstream completes (emits last group) + * + * '''Cancels when''' downstream completes + * + * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise + * IllegalArgumentException is thrown. + */ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = { + import akka.util.JavaDurationConverters._ + groupedWeightedWithin(maxWeight, costFn, d.asScala) + } + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -647,9 +697,27 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def dropWithin(d: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.dropWithin(d)) + /** + * Discard the elements received within the given duration at beginning of the stream. + * + * '''Emits when''' the specified time elapsed and a new upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def dropWithin(d: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + dropWithin(d.asScala) + } + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time, @@ -733,9 +801,41 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = new SubSource(delegate.delay(of, strategy)) + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + delay(of.asScala, strategy) + } + /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. @@ -858,9 +958,33 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels or timer fires */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def takeWithin(d: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.takeWithin(d)) + /** + * Terminate processing (and cancel the upstream publisher) after the given + * duration. Due to input buffering some elements may have been + * requested from upstream publishers that will then not be processed downstream + * of this step. + * + * Note that this can be combined with [[#take]] to limit the number of elements + * within the duration. + * + * '''Emits when''' an upstream element arrives + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or timer fires + * + * '''Cancels when''' downstream cancels or timer fires + */ + def takeWithin(d: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + takeWithin(d.asScala) + } + /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the @@ -1317,9 +1441,28 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialTimeout(timeout: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.initialTimeout(timeout)) + /** + * If the first element has not passed through this stage before the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels + */ + def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialTimeout(timeout.asScala) + } + /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. @@ -1332,9 +1475,28 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def completionTimeout(timeout: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.completionTimeout(timeout)) + /** + * If the completion of the stream does not happen until the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels + */ + def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + completionTimeout(timeout.asScala) + } + /** * If the time between two processed elements exceeds the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -1348,9 +1510,29 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def idleTimeout(timeout: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.idleTimeout(timeout)) + /** + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels + */ + def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + idleTimeout(timeout.asScala) + } + /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, @@ -1364,9 +1546,29 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def backpressureTimeout(timeout: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.backpressureTimeout(timeout)) + /** + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + backpressureTimeout(timeout.asScala) + } + /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. @@ -1384,9 +1586,33 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = new SubSource(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + keepAlive(maxIdle.asScala, injectedElem) + } + /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate * for emitting messages. This combinator works for streams where all elements have the same cost or length. @@ -1423,10 +1649,54 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = new SubSource(delegate.throttle(elements, per, maximumBurst, mode)) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, + mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(elements, per.asScala, maximumBurst, mode) + } + /** * Sends elements downstream with speed limited to `cost/per`. Cost is * calculating for each element individually by calling `calculateCost` function. @@ -1466,10 +1736,57 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * @see [[#throttleEven]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = new SubSource(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and started. + * + * Parameter `mode` manages behavior when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing + * the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce + * most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds). + * + * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering + * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in + * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting + * events being evenly spread with some small interval (30 milliseconds or less). + * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it + * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @see [[#throttleEven]] + */ + def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttle(cost, per.asScala, maximumBurst, costCalculation, mode) + } + /** * This is a simplified version of throttle that spreads events evenly across the given time interval. * @@ -1480,6 +1797,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = new SubSource(delegate.throttle(elements, per, Int.MaxValue, mode)) @@ -1493,10 +1812,43 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(elements, per.asScala, mode) + } + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = new SubSource(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode)) + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + def throttleEven(cost: Int, per: java.time.Duration, + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + throttleEven(cost, per.asScala, costCalculation, mode) + } + /** * Detaches upstream demand from downstream demand without detaching the * stream rates; in other words acts like a buffer of size 1. @@ -1522,9 +1874,27 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.initialDelay(delay)) + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay is already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] = { + import akka.util.JavaDurationConverters._ + initialDelay(delay.asScala) + } + /** * Change the attributes of this [[Source]] to the given ones and seal the list * of attributes. This means that further calls will not be able to remove these