automatic throttle burst size, #24699 (#24784)

* automatic throttle burst size, #24699

* throttleEven used maxBurst=Int.MaxValue, which in practise resulted
  in no throttling at all at high rates
* The original problem that throttleEven tried to solve was that when
  the throttle rate (cost / per) was high the cost of scheduling
  dominated and became much lower than the given rate, if 0 or low
  maxBurst was given. Difficult for user to know what maxBurst to use.
* In fact, that was already the case for rates > 30/s.
* This is fixed by automatically adjusting the maxBurst for higher
  throttle rates in a new throttle overload that doesn't include
  maxBurst parameter
* Also skipped the mode parameter for that variant since Shaping is
  what almost always is what you want, and otherwise you can use
  the full signature with a good maxBurst
* Deprecated throttleEven, since it is the same. Also fixed the
  implementation of throttleEven to use the automatic burst size,
  since Int.MaxValue is not useful at all.
This commit is contained in:
Patrik Nordwall 2018-04-02 16:07:16 +02:00 committed by Christopher Batey
parent ee85d23a3e
commit 6f330ef69c
14 changed files with 692 additions and 364 deletions

View file

@ -690,7 +690,7 @@ implicit val materializer = ActorMaterializer.create(system)
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(100, 1.second, 10, ThrottleMode.Shaping)
.throttle(100, 1.second)
.to(Sink.actorRef(target, NotUsed))
.run()
```
@ -699,7 +699,7 @@ Example in Java:
```
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@ -716,7 +716,7 @@ final Materializer materializer = ActorMaterializer.create(system);
final ActorRef throttler =
Source.actorRef(1000, OverflowStrategy.dropNew())
.throttle(100, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
.throttle(100, Duration.ofSeconds(1))
.to(Sink.actorRef(target, NotUsed.getInstance()))
.run(materializer);
```

View file

@ -186,10 +186,7 @@ All operations so far have been time-independent and could have been performed
in the same fashion on strict collections of elements. The next line
demonstrates that we are in fact dealing with streams that can flow at a
certain speed: we use the `throttle` combinator to slow down the stream to 1
element per second (the second `1` in the argument list is the maximum size
of a burst that we want to allow—passing `1` means that the first element
gets through immediately and the second then has to wait for one second and so
on).
element per second.
If you run this program you will see one line printed per second. One aspect
that is not immediately visible deserves mention, though: if you try and set

View file

@ -262,7 +262,7 @@ public class HubDocTest extends AbstractJavaTest {
Source<Integer, NotUsed> fromProducer = runnableGraph.run(materializer);
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.throttle(10, Duration.ofMillis(100), 10, ThrottleMode.shaping())
fromProducer.throttle(10, Duration.ofMillis(100))
.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
//#partition-hub-fastest

View file

@ -65,7 +65,7 @@ public class QuickStartDocTest extends AbstractJavaTest {
//#add-streams
factorials
.zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num))
.throttle(1, Duration.ofSeconds(1), 1, ThrottleMode.shaping())
.throttle(1, Duration.ofSeconds(1))
//#add-streams
.take(2)
//#add-streams

View file

@ -181,7 +181,7 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
fromProducer.runForeach(msg println("consumer1: " + msg))
fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping)
fromProducer.throttle(10, 100.millis)
.runForeach(msg println("consumer2: " + msg))
//#partition-hub-fastest
}

View file

@ -62,7 +62,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture
//#add-streams
factorials
.zipWith(Source(0 to 100))((num, idx) s"$idx! = $num")
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.throttle(1, 1.second)
//#add-streams
.take(3)
//#add-streams

View file

@ -4,7 +4,10 @@
package akka.stream.scaladsl
import akka.stream.ThrottleMode.{ Shaping, Enforcing }
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import akka.stream.ThrottleMode.{ Enforcing, Shaping }
import akka.stream._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
@ -14,6 +17,9 @@ import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NoStackTrace
import akka.Done
import akka.testkit.TimingTest
class FlowThrottleSpec extends StreamSpec {
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1))
@ -22,7 +28,8 @@ class FlowThrottleSpec extends StreamSpec {
"Throttle for single cost elements" must {
"work for the happy case" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.millis, 0, Shaping)
//Source(1 to 5).throttle(1, 100.millis, 0, Shaping)
Source(1 to 5).throttle(19, 1000.millis, -1, Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
@ -303,5 +310,67 @@ class FlowThrottleSpec extends StreamSpec {
.request(5)
.expectError(ex)
}
"work for real scenario with automatic burst size" taggedAs TimingTest in Utils.assertAllStagesStopped {
val startTime = System.nanoTime()
val counter1 = new AtomicInteger
val timestamp1 = new AtomicLong(System.nanoTime())
val expectedMinRate = new AtomicInteger
val expectedMaxRate = new AtomicInteger
val (ref, done) = Source.actorRef[Int](bufferSize = 100000, OverflowStrategy.fail)
.throttle(300, 1000.millis)
.toMat(Sink.foreach { elem
val now = System.nanoTime()
val n1 = counter1.incrementAndGet()
val duration1Millis = (now - timestamp1.get) / 1000 / 1000
if (duration1Millis >= 500) {
val rate = n1 * 1000.0 / duration1Millis
info(f"burst rate after ${(now - startTime).nanos.toMillis} ms at element $elem: $rate%2.2f elements/s ($n1)")
timestamp1.set(now)
counter1.set(0)
if (rate < expectedMinRate.get)
throw new RuntimeException(s"Too low rate, got $rate, expected min ${expectedMinRate.get}, " +
s"after ${(now - startTime).nanos.toMillis} ms at element $elem")
if (rate > expectedMaxRate.get)
throw new RuntimeException(s"Too high rate, got $rate, expected max ${expectedMaxRate.get}, " +
s"after ${(now - startTime).nanos.toMillis} ms at element $elem")
}
})(Keep.both)
.run()
expectedMaxRate.set(200) // sleep (at least) 5 ms between each element
(1 to 2700).foreach { n
if (!done.isCompleted) {
ref ! n
val now = System.nanoTime()
val elapsed = (now - startTime).nanos
val elapsedMs = elapsed.toMillis
if (elapsedMs >= 500 && elapsedMs <= 3000) {
expectedMinRate.set(100)
} else if (elapsedMs >= 3000 && elapsedMs <= 5000) {
expectedMaxRate.set(350) // could be up to 600 / s, but should be limited by the throttle
if (elapsedMs > 4000) expectedMinRate.set(250)
} else if (elapsedMs > 5000 && elapsedMs <= 8500) {
expectedMinRate.set(100)
} else if (elapsedMs > 10000) {
expectedMaxRate.set(200)
}
// higher rate for a few seconds
if (elapsedMs >= 3000 && elapsedMs <= 5000) {
// could be up to 600 / s, but should be limited by the throttle
if (n % 3 == 0)
Thread.sleep(5)
} else {
// around 200 / s
Thread.sleep(5)
}
}
}
ref ! akka.actor.Status.Success("done")
Await.result(done, 20.seconds) should ===(Done)
}
}
}

View file

@ -12,3 +12,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProc
# #23804 Added extrapolate stage
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate$default$2")
# #24699 throttle overload
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttle")

View file

@ -13,6 +13,13 @@ import akka.util.NanoTimeTokenBucket
import scala.concurrent.duration.{ FiniteDuration, _ }
/**
* INTERNAL API
*/
@InternalApi private[akka] object Throttle {
final val AutomaticMaximumBurst = -1
}
/**
* INTERNAL API
*/
@ -25,17 +32,23 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
extends SimpleLinearGraphStage[T] {
require(cost > 0, "cost must be > 0")
require(per.toNanos > 0, "per time must be > 0")
require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode")
require(per.toNanos >= cost, "Rates larger than 1 unit / nanosecond are not supported")
// There is some loss of precision here because of rounding, but this only happens if nanosBetweenTokens is very
// small which is usually at rates where that precision is highly unlikely anyway as the overhead of this stage
// is likely higher than the required accuracy interval.
private val nanosBetweenTokens = per.toNanos / cost
// 100 ms is a realistic minimum between tokens, otherwise the maximumBurst is adjusted
// to be able to support higher rates
val effectiveMaximumBurst =
if (maximumBurst == Throttle.AutomaticMaximumBurst) math.max(1, ((100 * 1000 * 1000) / nanosBetweenTokens))
else maximumBurst
require(!(mode == ThrottleMode.Enforcing && effectiveMaximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode")
private val timerName: String = "ThrottleTimer"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private val tokenBucket = new NanoTimeTokenBucket(maximumBurst, nanosBetweenTokens)
private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens)
var willStop = false
var currentElement: T = _

View file

@ -15,6 +15,7 @@ import scala.concurrent.duration.FiniteDuration
import akka.japi.Util
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import akka.util.JavaDurationConverters._
import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts
@ -1059,10 +1060,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWithin(n: Int, d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] =
groupedWithin(n, d.asScala)
}
/**
* Chunk up this stream into groups of elements received within a time window,
@ -1105,10 +1104,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala)
}
/**
* Shifts elements emission in time by a specified amount. It allows to store elements
@ -1165,10 +1162,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] =
delay(of.asScala, strategy)
}
/**
* Discard the given number of elements at the beginning of the stream.
@ -1212,10 +1207,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def dropWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] =
dropWithin(d.asScala)
}
/**
* Terminate processing (and cancel the upstream publisher) after predicate
@ -1434,10 +1427,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* See also [[Flow.limit]], [[Flow.limitWeighted]]
*/
def takeWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def takeWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] =
takeWithin(d.asScala)
}
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -2389,10 +2380,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] =
initialTimeout(timeout.asScala)
}
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
@ -2423,10 +2412,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def completionTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] =
completionTimeout(timeout.asScala)
}
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
@ -2459,10 +2446,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def idleTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] =
idleTimeout(timeout.asScala)
}
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
@ -2495,10 +2480,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def backpressureTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] =
backpressureTimeout(timeout.asScala)
}
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
@ -2539,10 +2522,41 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] =
keepAlive(maxIdle.asScala, injectedElem)
}
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(elements: Int, per: java.time.Duration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(elements, per.asScala))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
@ -2565,10 +2579,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2578,7 +2593,6 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -2607,10 +2621,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2620,13 +2635,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int,
mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(elements, per.asScala, maximumBurst, mode)
}
mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
@ -2652,10 +2664,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2665,7 +2678,6 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -2673,6 +2685,42 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per.asScala, costCalculation.apply))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
@ -2697,10 +2745,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2710,13 +2759,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(cost, per.asScala, maximumBurst, costCalculation, mode)
}
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2731,22 +2777,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
throttleEven(elements, per.asScala, mode)
}
new Flow(delegate.throttleEven(elements, per, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2759,10 +2790,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode))
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
throttleEven(elements, per.asScala, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2774,11 +2804,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttleEven(cost, per, costCalculation.apply, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
throttleEven(cost, per.asScala, costCalculation, mode)
}
/**
* Detaches upstream demand from downstream demand without detaching the
@ -2839,10 +2885,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: java.time.Duration): javadsl.Flow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialDelay(delay: java.time.Duration): javadsl.Flow[In, Out, Mat] =
initialDelay(delay.asScala)
}
/**
* Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite

View file

@ -13,6 +13,7 @@ import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.stream.impl.{ LinearTraversalBuilder, SourceQueueAdapter }
import akka.util.{ ConstantFun, Timeout }
import akka.util.JavaDurationConverters._
import akka.{ Done, NotUsed }
import org.reactivestreams.{ Publisher, Subscriber }
@ -216,10 +217,8 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] = {
import akka.util.JavaDurationConverters._
def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] =
Source.tick(initialDelay.asScala, interval.asScala, tick)
}
/**
* Create a `Source` with one element.
@ -1730,10 +1729,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWithin(n: Int, d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
groupedWithin(n, d.asScala)
}
/**
* Chunk up this stream into groups of elements received within a time window,
@ -1776,10 +1773,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala)
}
/**
* Shifts elements emission in time by a specified amount. It allows to store elements
@ -1836,10 +1831,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] =
delay(of.asScala, strategy)
}
/**
* Discard the given number of elements at the beginning of the stream.
@ -1883,10 +1876,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] =
dropWithin(d.asScala)
}
/**
* Terminate processing (and cancel the upstream publisher) after predicate
@ -1984,10 +1975,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def takeWithin(d: java.time.Duration): javadsl.Source[Out, Mat] =
takeWithin(d.asScala)
}
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -2441,10 +2430,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
initialTimeout(timeout.asScala)
}
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
@ -2475,10 +2462,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
completionTimeout(timeout.asScala)
}
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
@ -2511,10 +2496,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
idleTimeout(timeout.asScala)
}
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
@ -2547,10 +2530,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] =
backpressureTimeout(timeout.asScala)
}
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
@ -2591,10 +2572,41 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] =
keepAlive(maxIdle.asScala, injectedElem)
}
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(elements: Int, per: java.time.Duration): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per.asScala))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
@ -2617,10 +2629,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2630,7 +2643,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -2659,10 +2671,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2672,13 +2685,46 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int,
mode: ThrottleMode): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(elements, per.asScala, maximumBurst, mode)
}
mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per.asScala, costCalculation.apply _))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
@ -2704,10 +2750,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2717,7 +2764,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -2749,10 +2795,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2762,13 +2809,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(cost, per.asScala, maximumBurst, costCalculation, mode)
}
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2781,24 +2825,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per, Int.MaxValue, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
throttleEven(elements, per.asScala, mode)
}
new Source(delegate.throttleEven(elements, per, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2811,10 +2840,25 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Source[Out, Mat] =
throttleEven(elements, per.asScala, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode))
new Source(delegate.throttleEven(cost, per, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2826,11 +2870,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: java.time.Duration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.Source[Out, Mat] =
throttleEven(cost, per.asScala, costCalculation, mode)
}
/**
* Detaches upstream demand from downstream demand without detaching the
@ -2891,10 +2935,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] =
initialDelay(delay.asScala)
}
/**
* Replace the attributes of this [[Source]] with the given ones. If this Source is a composite

View file

@ -9,6 +9,7 @@ import akka.event.LoggingAdapter
import akka.japi.function
import akka.stream._
import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
@ -629,10 +630,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: java.time.Duration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWithin(n: Int, d: java.time.Duration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
groupedWithin(n, d.asScala)
}
/**
* Chunk up this stream into groups of elements received within a time window,
@ -675,10 +674,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala)
}
/**
* Shifts elements emission in time by a specified amount. It allows to store elements
@ -735,10 +732,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] =
delay(of.asScala, strategy)
}
/**
* Discard the given number of elements at the beginning of the stream.
@ -782,10 +777,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def dropWithin(d: java.time.Duration): SubFlow[In, Out, Mat] =
dropWithin(d.asScala)
}
/**
* Terminate processing (and cancel the upstream publisher) after predicate
@ -995,10 +988,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def takeWithin(d: java.time.Duration): SubFlow[In, Out, Mat] =
takeWithin(d.asScala)
}
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -1531,10 +1522,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] =
initialTimeout(timeout.asScala)
}
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
@ -1565,10 +1554,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def completionTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] =
completionTimeout(timeout.asScala)
}
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
@ -1601,10 +1588,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def idleTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] =
idleTimeout(timeout.asScala)
}
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
@ -1637,10 +1622,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def backpressureTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] =
backpressureTimeout(timeout.asScala)
}
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
@ -1681,10 +1664,41 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] =
keepAlive(maxIdle.asScala, injectedElem)
}
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(elements: Int, per: java.time.Duration): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(elements, per.asScala))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
@ -1707,10 +1721,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1720,7 +1735,6 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -1749,10 +1763,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1762,13 +1777,46 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int,
mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(elements, per.asScala, maximumBurst, mode)
}
mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(cost, per.asScala, costCalculation.apply))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
@ -1794,10 +1842,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1807,7 +1856,6 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -1839,10 +1887,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1852,13 +1901,10 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(cost, per.asScala, maximumBurst, costCalculation, mode)
}
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -1871,24 +1917,9 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
throttleEven(elements, per.asScala, mode)
}
new SubFlow(delegate.throttleEven(elements, per, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -1901,10 +1932,25 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
throttleEven(elements, per.asScala, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
new SubFlow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode))
new SubFlow(delegate.throttleEven(cost, per, costCalculation.apply, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -1916,11 +1962,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] =
throttleEven(cost, per.asScala, costCalculation, mode)
}
/**
* Detaches upstream demand from downstream demand without detaching the
@ -1963,10 +2009,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: java.time.Duration): SubFlow[In, Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialDelay(delay: java.time.Duration): SubFlow[In, Out, Mat] =
initialDelay(delay.asScala)
}
/**
* Change the attributes of this [[Source]] to the given ones and seal the list

View file

@ -10,6 +10,7 @@ import akka.japi.function
import akka.japi.Util
import akka.stream._
import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import scala.collection.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
@ -620,10 +621,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
groupedWithin(n, d.asScala)
}
/**
* Chunk up this stream into groups of elements received within a time window,
@ -666,10 +665,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = {
import akka.util.JavaDurationConverters._
def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] =
groupedWeightedWithin(maxWeight, costFn, d.asScala)
}
/**
* Discard the given number of elements at the beginning of the stream.
@ -713,10 +710,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def dropWithin(d: java.time.Duration): SubSource[Out, Mat] =
dropWithin(d.asScala)
}
/**
* Terminate processing (and cancel the upstream publisher) after predicate
@ -831,10 +826,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] =
delay(of.asScala, strategy)
}
/**
* Recover allows to send last element on failure and gracefully complete the stream
@ -980,10 +973,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def takeWithin(d: java.time.Duration): SubSource[Out, Mat] =
takeWithin(d.asScala)
}
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
@ -1516,10 +1507,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
initialTimeout(timeout.asScala)
}
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
@ -1550,10 +1539,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
completionTimeout(timeout.asScala)
}
/**
* If the time between two processed elements exceeds the provided timeout, the stream is failed
@ -1586,10 +1573,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
idleTimeout(timeout.asScala)
}
/**
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
@ -1622,10 +1607,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] =
backpressureTimeout(timeout.asScala)
}
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
@ -1666,10 +1649,41 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] =
keepAlive(maxIdle.asScala, injectedElem)
}
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(elements: Int, per: java.time.Duration): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per.asScala))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
@ -1692,10 +1706,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1705,7 +1720,6 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -1734,10 +1748,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1747,13 +1762,46 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int,
mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(elements, per.asScala, maximumBurst, mode)
}
mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per.asScala, costCalculation.apply _))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
@ -1779,10 +1827,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1792,7 +1841,6 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@ -1824,10 +1872,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -1837,13 +1886,10 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
throttle(cost, per.asScala, maximumBurst, costCalculation, mode)
}
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -1856,24 +1902,9 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(elements, per, Int.MaxValue, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
throttleEven(elements, per.asScala, mode)
}
new SubSource(delegate.throttleEven(elements, per, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -1886,10 +1917,25 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
throttleEven(elements, per.asScala, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
*
* Use this combinator when you need just slow down a stream without worrying about exact amount
* of time between events.
*
* If you want to be sure that no time interval has no more than specified number of events you need to use
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode))
new SubSource(delegate.throttleEven(cost, per, costCalculation.apply _, mode))
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -1901,11 +1947,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: java.time.Duration,
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
costCalculation: (Out) Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] =
throttleEven(cost, per.asScala, costCalculation, mode)
}
/**
* Detaches upstream demand from downstream demand without detaching the
@ -1948,10 +1994,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] = {
import akka.util.JavaDurationConverters._
def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] =
initialDelay(delay.asScala)
}
/**
* Change the attributes of this [[Source]] to the given ones and seal the list

View file

@ -2047,6 +2047,38 @@ trait FlowOps[+Out, +Mat] {
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () U): Repr[U] =
via(new Timers.IdleInject[Out, U](maxIdle, injectedElem))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(elements: Int, per: FiniteDuration): Repr[Out] =
throttle(elements, per, maximumBurst = Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, ThrottleMode.Shaping)
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
@ -2069,10 +2101,11 @@ trait FlowOps[+Out, +Mat] {
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2082,11 +2115,45 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] =
throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode)
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstiness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element costs. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and
* started.
*
* The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example:
* - rate < 20/second => burst size 1
* - rate 20/second => burst size 2
* - rate 100/second => burst size 10
* - rate 200/second => burst size 20
*
* The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def throttle(cost: Int, per: FiniteDuration, costCalculation: (Out) Int): Repr[Out] =
via(new Throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, ThrottleMode.Shaping))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
@ -2111,10 +2178,11 @@ trait FlowOps[+Out, +Mat] {
*
* WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering
* next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in
* case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting
* events being evenly spread with some small interval (30 milliseconds or less).
* In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it
* enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
* case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if
* elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without
* `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`).
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
@ -2124,7 +2192,6 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*
* @see [[#throttleEven]]
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: (Out) Int, mode: ThrottleMode): Repr[Out] =
@ -2141,8 +2208,10 @@ trait FlowOps[+Out, +Mat] {
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): Repr[Out] =
throttle(elements, per, Int.MaxValue, ConstantFun.oneInt, mode)
throttle(elements, per, Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, mode)
/**
* This is a simplified version of throttle that spreads events evenly across the given time interval.
@ -2154,9 +2223,11 @@ trait FlowOps[+Out, +Mat] {
* [[throttle()]] with maximumBurst attribute.
* @see [[#throttle]]
*/
@Deprecated
@deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12")
def throttleEven(cost: Int, per: FiniteDuration,
costCalculation: (Out) Int, mode: ThrottleMode): Repr[Out] =
via(new Throttle(cost, per, Int.MaxValue, costCalculation, mode))
throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, mode)
/**
* Detaches upstream demand from downstream demand without detaching the