diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.4.x-2.5.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.4.x-2.5.x.md index c94d7312b3..88fecdd9a7 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.4.x-2.5.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.4.x-2.5.x.md @@ -690,7 +690,7 @@ implicit val materializer = ActorMaterializer.create(system) val throttler: ActorRef = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew) - .throttle(100, 1.second, 10, ThrottleMode.Shaping) + .throttle(100, 1.second) .to(Sink.actorRef(target, NotUsed)) .run() ``` @@ -699,7 +699,7 @@ Example in Java: ``` import java.util.concurrent.TimeUnit; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -716,7 +716,7 @@ final Materializer materializer = ActorMaterializer.create(system); final ActorRef throttler = Source.actorRef(1000, OverflowStrategy.dropNew()) - .throttle(100, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping()) + .throttle(100, Duration.ofSeconds(1)) .to(Sink.actorRef(target, NotUsed.getInstance())) .run(materializer); ``` diff --git a/akka-docs/src/main/paradox/stream/stream-quickstart.md b/akka-docs/src/main/paradox/stream/stream-quickstart.md index 33b9c4e485..66930d2cd4 100644 --- a/akka-docs/src/main/paradox/stream/stream-quickstart.md +++ b/akka-docs/src/main/paradox/stream/stream-quickstart.md @@ -186,10 +186,7 @@ All operations so far have been time-independent and could have been performed in the same fashion on strict collections of elements. The next line demonstrates that we are in fact dealing with streams that can flow at a certain speed: we use the `throttle` combinator to slow down the stream to 1 -element per second (the second `1` in the argument list is the maximum size -of a burst that we want to allow—passing `1` means that the first element -gets through immediately and the second then has to wait for one second and so -on). +element per second. If you run this program you will see one line printed per second. One aspect that is not immediately visible deserves mention, though: if you try and set diff --git a/akka-docs/src/test/java/jdocs/stream/HubDocTest.java b/akka-docs/src/test/java/jdocs/stream/HubDocTest.java index cef8801d3c..06b592ac6b 100644 --- a/akka-docs/src/test/java/jdocs/stream/HubDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/HubDocTest.java @@ -262,7 +262,7 @@ public class HubDocTest extends AbstractJavaTest { Source fromProducer = runnableGraph.run(materializer); fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); - fromProducer.throttle(10, Duration.ofMillis(100), 10, ThrottleMode.shaping()) + fromProducer.throttle(10, Duration.ofMillis(100)) .runForeach(msg -> System.out.println("consumer2: " + msg), materializer); //#partition-hub-fastest diff --git a/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java b/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java index 5b310f50ba..ceafd900c8 100644 --- a/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java @@ -65,7 +65,7 @@ public class QuickStartDocTest extends AbstractJavaTest { //#add-streams factorials .zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num)) - .throttle(1, Duration.ofSeconds(1), 1, ThrottleMode.shaping()) + .throttle(1, Duration.ofSeconds(1)) //#add-streams .take(2) //#add-streams diff --git a/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala b/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala index 9be5024025..8dfa856e12 100644 --- a/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala @@ -181,7 +181,7 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec { val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg ⇒ println("consumer1: " + msg)) - fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) + fromProducer.throttle(10, 100.millis) .runForeach(msg ⇒ println("consumer2: " + msg)) //#partition-hub-fastest } diff --git a/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala b/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala index 1609b54c53..5f686aaa05 100644 --- a/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala @@ -62,7 +62,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture //#add-streams factorials .zipWith(Source(0 to 100))((num, idx) ⇒ s"$idx! = $num") - .throttle(1, 1.second, 1, ThrottleMode.shaping) + .throttle(1, 1.second) //#add-streams .take(3) //#add-streams diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala index 8c6c3c4f7d..e2a0e3cf40 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -4,7 +4,10 @@ package akka.stream.scaladsl -import akka.stream.ThrottleMode.{ Shaping, Enforcing } +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +import akka.stream.ThrottleMode.{ Enforcing, Shaping } import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink @@ -14,6 +17,9 @@ import scala.concurrent.duration._ import scala.util.Random import scala.util.control.NoStackTrace +import akka.Done +import akka.testkit.TimingTest + class FlowThrottleSpec extends StreamSpec { implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1)) @@ -22,7 +28,8 @@ class FlowThrottleSpec extends StreamSpec { "Throttle for single cost elements" must { "work for the happy case" in Utils.assertAllStagesStopped { - Source(1 to 5).throttle(1, 100.millis, 0, Shaping) + //Source(1 to 5).throttle(1, 100.millis, 0, Shaping) + Source(1 to 5).throttle(19, 1000.millis, -1, Shaping) .runWith(TestSink.probe[Int]) .request(5) .expectNext(1, 2, 3, 4, 5) @@ -303,5 +310,67 @@ class FlowThrottleSpec extends StreamSpec { .request(5) .expectError(ex) } + + "work for real scenario with automatic burst size" taggedAs TimingTest in Utils.assertAllStagesStopped { + val startTime = System.nanoTime() + val counter1 = new AtomicInteger + val timestamp1 = new AtomicLong(System.nanoTime()) + val expectedMinRate = new AtomicInteger + val expectedMaxRate = new AtomicInteger + val (ref, done) = Source.actorRef[Int](bufferSize = 100000, OverflowStrategy.fail) + .throttle(300, 1000.millis) + .toMat(Sink.foreach { elem ⇒ + val now = System.nanoTime() + val n1 = counter1.incrementAndGet() + val duration1Millis = (now - timestamp1.get) / 1000 / 1000 + if (duration1Millis >= 500) { + val rate = n1 * 1000.0 / duration1Millis + info(f"burst rate after ${(now - startTime).nanos.toMillis} ms at element $elem: $rate%2.2f elements/s ($n1)") + timestamp1.set(now) + counter1.set(0) + if (rate < expectedMinRate.get) + throw new RuntimeException(s"Too low rate, got $rate, expected min ${expectedMinRate.get}, " + + s"after ${(now - startTime).nanos.toMillis} ms at element $elem") + if (rate > expectedMaxRate.get) + throw new RuntimeException(s"Too high rate, got $rate, expected max ${expectedMaxRate.get}, " + + s"after ${(now - startTime).nanos.toMillis} ms at element $elem") + } + })(Keep.both) + .run() + + expectedMaxRate.set(200) // sleep (at least) 5 ms between each element + (1 to 2700).foreach { n ⇒ + if (!done.isCompleted) { + ref ! n + val now = System.nanoTime() + val elapsed = (now - startTime).nanos + val elapsedMs = elapsed.toMillis + if (elapsedMs >= 500 && elapsedMs <= 3000) { + expectedMinRate.set(100) + } else if (elapsedMs >= 3000 && elapsedMs <= 5000) { + expectedMaxRate.set(350) // could be up to 600 / s, but should be limited by the throttle + if (elapsedMs > 4000) expectedMinRate.set(250) + } else if (elapsedMs > 5000 && elapsedMs <= 8500) { + expectedMinRate.set(100) + } else if (elapsedMs > 10000) { + expectedMaxRate.set(200) + } + + // higher rate for a few seconds + if (elapsedMs >= 3000 && elapsedMs <= 5000) { + // could be up to 600 / s, but should be limited by the throttle + if (n % 3 == 0) + Thread.sleep(5) + } else { + // around 200 / s + Thread.sleep(5) + } + } + } + ref ! akka.actor.Status.Success("done") + + Await.result(done, 20.seconds) should ===(Done) + } + } } diff --git a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes index e64a8b455e..ac34b69d3f 100644 --- a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes @@ -11,4 +11,8 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProc # #23804 Added extrapolate stage ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate$default$2") \ No newline at end of file +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate$default$2") + +# #24699 throttle overload +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.throttle") + diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index 827a3704f8..fbcf9a256d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -13,6 +13,13 @@ import akka.util.NanoTimeTokenBucket import scala.concurrent.duration.{ FiniteDuration, _ } +/** + * INTERNAL API + */ +@InternalApi private[akka] object Throttle { + final val AutomaticMaximumBurst = -1 +} + /** * INTERNAL API */ @@ -25,17 +32,23 @@ import scala.concurrent.duration.{ FiniteDuration, _ } extends SimpleLinearGraphStage[T] { require(cost > 0, "cost must be > 0") require(per.toNanos > 0, "per time must be > 0") - require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") require(per.toNanos >= cost, "Rates larger than 1 unit / nanosecond are not supported") // There is some loss of precision here because of rounding, but this only happens if nanosBetweenTokens is very // small which is usually at rates where that precision is highly unlikely anyway as the overhead of this stage // is likely higher than the required accuracy interval. private val nanosBetweenTokens = per.toNanos / cost + // 100 ms is a realistic minimum between tokens, otherwise the maximumBurst is adjusted + // to be able to support higher rates + val effectiveMaximumBurst = + if (maximumBurst == Throttle.AutomaticMaximumBurst) math.max(1, ((100 * 1000 * 1000) / nanosBetweenTokens)) + else maximumBurst + require(!(mode == ThrottleMode.Enforcing && effectiveMaximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") + private val timerName: String = "ThrottleTimer" override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - private val tokenBucket = new NanoTimeTokenBucket(maximumBurst, nanosBetweenTokens) + private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens) var willStop = false var currentElement: T = _ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 8d99105dff..b37b30e2c8 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -15,6 +15,7 @@ import scala.concurrent.duration.FiniteDuration import akka.japi.Util import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage +import akka.util.JavaDurationConverters._ import akka.actor.ActorRef import akka.dispatch.ExecutionContexts @@ -1059,10 +1060,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWithin(n: Int, d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = groupedWithin(n, d.asScala) - } /** * Chunk up this stream into groups of elements received within a time window, @@ -1105,10 +1104,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Flow[In, java.util.List[Out], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) - } /** * Shifts elements emission in time by a specified amount. It allows to store elements @@ -1165,10 +1162,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = delay(of.asScala, strategy) - } /** * Discard the given number of elements at the beginning of the stream. @@ -1212,10 +1207,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def dropWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def dropWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = dropWithin(d.asScala) - } /** * Terminate processing (and cancel the upstream publisher) after predicate @@ -1434,10 +1427,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * See also [[Flow.limit]], [[Flow.limitWeighted]] */ - def takeWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def takeWithin(d: java.time.Duration): javadsl.Flow[In, Out, Mat] = takeWithin(d.asScala) - } /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -2389,10 +2380,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def initialTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = initialTimeout(timeout.asScala) - } /** * If the completion of the stream does not happen until the provided timeout, the stream is failed @@ -2423,10 +2412,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def completionTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def completionTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = completionTimeout(timeout.asScala) - } /** * If the time between two processed elements exceeds the provided timeout, the stream is failed @@ -2459,10 +2446,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def idleTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def idleTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = idleTimeout(timeout.asScala) - } /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, @@ -2495,10 +2480,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def backpressureTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def backpressureTimeout(timeout: java.time.Duration): javadsl.Flow[In, Out, Mat] = backpressureTimeout(timeout.asScala) - } /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this @@ -2539,10 +2522,41 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) - } + + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(elements: Int, per: java.time.Duration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(elements, per.asScala)) /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate @@ -2565,10 +2579,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2578,7 +2593,6 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -2607,10 +2621,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2620,13 +2635,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, - mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(elements, per.asScala, maximumBurst, mode) - } + mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(elements, per.asScala, maximumBurst, mode)) /** * Sends elements downstream with speed limited to `cost/per`. Cost is @@ -2652,10 +2664,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2665,7 +2678,6 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -2673,6 +2685,42 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(cost: Int, per: java.time.Duration, + costCalculation: function.Function[Out, Integer]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(cost, per.asScala, costCalculation.apply)) + /** * Sends elements downstream with speed limited to `cost/per`. Cost is * calculating for each element individually by calling `calculateCost` function. @@ -2697,10 +2745,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2710,13 +2759,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(cost, per.asScala, maximumBurst, costCalculation, mode) - } + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2731,22 +2777,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = - new Flow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode)) - - /** - * This is a simplified version of throttle that spreads events evenly across the given time interval. - * - * Use this combinator when you need just slow down a stream without worrying about exact amount - * of time between events. - * - * If you want to be sure that no time interval has no more than specified number of events you need to use - * [[throttle()]] with maximumBurst attribute. - * @see [[#throttle]] - */ - def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttleEven(elements, per.asScala, mode) - } + new Flow(delegate.throttleEven(elements, per, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2759,10 +2790,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") - def throttleEven(cost: Int, per: FiniteDuration, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = - new Flow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode)) + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + throttleEven(elements, per.asScala, mode) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2774,11 +2804,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") + def throttleEven(cost: Int, per: FiniteDuration, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttleEven(cost, per, costCalculation.apply, mode)) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: java.time.Duration, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = throttleEven(cost, per.asScala, costCalculation, mode) - } /** * Detaches upstream demand from downstream demand without detaching the @@ -2839,10 +2885,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def initialDelay(delay: java.time.Duration): javadsl.Flow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialDelay(delay: java.time.Duration): javadsl.Flow[In, Out, Mat] = initialDelay(delay.asScala) - } /** * Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9d747ccf43..3f49e9787e 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -13,6 +13,7 @@ import akka.japi.{ Pair, Util, function } import akka.stream._ import akka.stream.impl.{ LinearTraversalBuilder, SourceQueueAdapter } import akka.util.{ ConstantFun, Timeout } +import akka.util.JavaDurationConverters._ import akka.{ Done, NotUsed } import org.reactivestreams.{ Publisher, Subscriber } @@ -216,10 +217,8 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] = { - import akka.util.JavaDurationConverters._ + def tick[O](initialDelay: java.time.Duration, interval: java.time.Duration, tick: O): javadsl.Source[O, Cancellable] = Source.tick(initialDelay.asScala, interval.asScala, tick) - } /** * Create a `Source` with one element. @@ -1730,10 +1729,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWithin(n: Int, d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = groupedWithin(n, d.asScala) - } /** * Chunk up this stream into groups of elements received within a time window, @@ -1776,10 +1773,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) - } /** * Shifts elements emission in time by a specified amount. It allows to store elements @@ -1836,10 +1831,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] = delay(of.asScala, strategy) - } /** * Discard the given number of elements at the beginning of the stream. @@ -1883,10 +1876,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def dropWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = dropWithin(d.asScala) - } /** * Terminate processing (and cancel the upstream publisher) after predicate @@ -1984,10 +1975,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels or timer fires */ - def takeWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def takeWithin(d: java.time.Duration): javadsl.Source[Out, Mat] = takeWithin(d.asScala) - } /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -2441,10 +2430,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = initialTimeout(timeout.asScala) - } /** * If the completion of the stream does not happen until the provided timeout, the stream is failed @@ -2475,10 +2462,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def completionTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = completionTimeout(timeout.asScala) - } /** * If the time between two processed elements exceeds the provided timeout, the stream is failed @@ -2511,10 +2496,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def idleTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = idleTimeout(timeout.asScala) - } /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, @@ -2547,10 +2530,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def backpressureTimeout(timeout: java.time.Duration): javadsl.Source[Out, Mat] = backpressureTimeout(timeout.asScala) - } /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this @@ -2591,10 +2572,41 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) - } + + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(elements: Int, per: java.time.Duration): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(elements, per.asScala)) /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate @@ -2617,10 +2629,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2630,7 +2643,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -2659,10 +2671,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2672,13 +2685,46 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, - mode: ThrottleMode): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(elements, per.asScala, maximumBurst, mode) - } + mode: ThrottleMode): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(elements, per.asScala, maximumBurst, mode)) + + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(cost: Int, per: java.time.Duration, + costCalculation: function.Function[Out, Integer]): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(cost, per.asScala, costCalculation.apply _)) /** * Sends elements downstream with speed limited to `cost/per`. Cost is @@ -2704,10 +2750,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2717,7 +2764,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -2749,10 +2795,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2762,13 +2809,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(cost, per.asScala, maximumBurst, costCalculation, mode) - } + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply _, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2781,24 +2825,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.Source[Out, Mat] = - new Source(delegate.throttle(elements, per, Int.MaxValue, mode)) - - /** - * This is a simplified version of throttle that spreads events evenly across the given time interval. - * - * Use this combinator when you need just slow down a stream without worrying about exact amount - * of time between events. - * - * If you want to be sure that no time interval has no more than specified number of events you need to use - * [[throttle()]] with maximumBurst attribute. - * @see [[#throttle]] - */ - def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttleEven(elements, per.asScala, mode) - } + new Source(delegate.throttleEven(elements, per, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2811,10 +2840,25 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.Source[Out, Mat] = + throttleEven(elements, per.asScala, mode) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = - new Source(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode)) + new Source(delegate.throttleEven(cost, per, costCalculation.apply _, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2826,11 +2870,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: java.time.Duration, - costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.Source[Out, Mat] = throttleEven(cost, per.asScala, costCalculation, mode) - } /** * Detaches upstream demand from downstream demand without detaching the @@ -2891,10 +2935,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialDelay(delay: java.time.Duration): javadsl.Source[Out, Mat] = initialDelay(delay.asScala) - } /** * Replace the attributes of this [[Source]] with the given ones. If this Source is a composite diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 615389ef55..a3622aaf00 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -9,6 +9,7 @@ import akka.event.LoggingAdapter import akka.japi.function import akka.stream._ import akka.util.ConstantFun +import akka.util.JavaDurationConverters._ import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance @@ -629,10 +630,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: java.time.Duration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWithin(n: Int, d: java.time.Duration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = groupedWithin(n, d.asScala) - } /** * Chunk up this stream into groups of elements received within a time window, @@ -675,10 +674,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) - } /** * Shifts elements emission in time by a specified amount. It allows to store elements @@ -735,10 +732,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = delay(of.asScala, strategy) - } /** * Discard the given number of elements at the beginning of the stream. @@ -782,10 +777,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def dropWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def dropWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = dropWithin(d.asScala) - } /** * Terminate processing (and cancel the upstream publisher) after predicate @@ -995,10 +988,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels or timer fires */ - def takeWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def takeWithin(d: java.time.Duration): SubFlow[In, Out, Mat] = takeWithin(d.asScala) - } /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -1531,10 +1522,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def initialTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = initialTimeout(timeout.asScala) - } /** * If the completion of the stream does not happen until the provided timeout, the stream is failed @@ -1565,10 +1554,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def completionTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def completionTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = completionTimeout(timeout.asScala) - } /** * If the time between two processed elements exceeds the provided timeout, the stream is failed @@ -1601,10 +1588,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def idleTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def idleTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = idleTimeout(timeout.asScala) - } /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, @@ -1637,10 +1622,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def backpressureTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def backpressureTimeout(timeout: java.time.Duration): SubFlow[In, Out, Mat] = backpressureTimeout(timeout.asScala) - } /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this @@ -1681,10 +1664,41 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) - } + + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(elements: Int, per: java.time.Duration): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.throttle(elements, per.asScala)) /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate @@ -1707,10 +1721,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1720,7 +1735,6 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -1749,10 +1763,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1762,13 +1777,46 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, - mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(elements, per.asScala, maximumBurst, mode) - } + mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.throttle(elements, per.asScala, maximumBurst, mode)) + + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(cost: Int, per: java.time.Duration, + costCalculation: function.Function[Out, Integer]): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.throttle(cost, per.asScala, costCalculation.apply)) /** * Sends elements downstream with speed limited to `cost/per`. Cost is @@ -1794,10 +1842,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1807,7 +1856,6 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -1839,10 +1887,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1852,13 +1901,10 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(cost, per.asScala, maximumBurst, costCalculation, mode) - } + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = + new SubFlow(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -1871,24 +1917,9 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = - new SubFlow(delegate.throttle(elements, per, Integer.MAX_VALUE, mode)) - - /** - * This is a simplified version of throttle that spreads events evenly across the given time interval. - * - * Use this combinator when you need just slow down a stream without worrying about exact amount - * of time between events. - * - * If you want to be sure that no time interval has no more than specified number of events you need to use - * [[throttle()]] with maximumBurst attribute. - * @see [[#throttle]] - */ - def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttleEven(elements, per.asScala, mode) - } + new SubFlow(delegate.throttleEven(elements, per, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -1901,10 +1932,25 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = + throttleEven(elements, per.asScala, mode) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = - new SubFlow(delegate.throttle(cost, per, Integer.MAX_VALUE, costCalculation.apply, mode)) + new SubFlow(delegate.throttleEven(cost, per, costCalculation.apply, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -1916,11 +1962,11 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: java.time.Duration, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = throttleEven(cost, per.asScala, costCalculation, mode) - } /** * Detaches upstream demand from downstream demand without detaching the @@ -1963,10 +2009,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels */ - def initialDelay(delay: java.time.Duration): SubFlow[In, Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialDelay(delay: java.time.Duration): SubFlow[In, Out, Mat] = initialDelay(delay.asScala) - } /** * Change the attributes of this [[Source]] to the given ones and seal the list diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 185ad2fc2e..3975dc295b 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -10,6 +10,7 @@ import akka.japi.function import akka.japi.Util import akka.stream._ import akka.util.ConstantFun +import akka.util.JavaDurationConverters._ import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance @@ -620,10 +621,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWithin(n: Int, d: java.time.Duration): SubSource[java.util.List[Out @uncheckedVariance], Mat] = groupedWithin(n, d.asScala) - } /** * Chunk up this stream into groups of elements received within a time window, @@ -666,10 +665,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = { - import akka.util.JavaDurationConverters._ + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: java.time.Duration): javadsl.SubSource[java.util.List[Out @uncheckedVariance], Mat] = groupedWeightedWithin(maxWeight, costFn, d.asScala) - } /** * Discard the given number of elements at the beginning of the stream. @@ -713,10 +710,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def dropWithin(d: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def dropWithin(d: java.time.Duration): SubSource[Out, Mat] = dropWithin(d.asScala) - } /** * Terminate processing (and cancel the upstream publisher) after predicate @@ -831,10 +826,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * @param of time to shift all messages * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = delay(of.asScala, strategy) - } /** * Recover allows to send last element on failure and gracefully complete the stream @@ -980,10 +973,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels or timer fires */ - def takeWithin(d: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def takeWithin(d: java.time.Duration): SubSource[Out, Mat] = takeWithin(d.asScala) - } /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -1516,10 +1507,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = initialTimeout(timeout.asScala) - } /** * If the completion of the stream does not happen until the provided timeout, the stream is failed @@ -1550,10 +1539,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def completionTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = completionTimeout(timeout.asScala) - } /** * If the time between two processed elements exceeds the provided timeout, the stream is failed @@ -1586,10 +1573,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def idleTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = idleTimeout(timeout.asScala) - } /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, @@ -1622,10 +1607,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def backpressureTimeout(timeout: java.time.Duration): SubSource[Out, Mat] = backpressureTimeout(timeout.asScala) - } /** * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this @@ -1666,10 +1649,41 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def keepAlive(maxIdle: java.time.Duration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = keepAlive(maxIdle.asScala, injectedElem) - } + + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(elements: Int, per: java.time.Duration): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.throttle(elements, per.asScala)) /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate @@ -1692,10 +1706,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1705,7 +1720,6 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -1734,10 +1748,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1747,13 +1762,46 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(elements: Int, per: java.time.Duration, maximumBurst: Int, - mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(elements, per.asScala, maximumBurst, mode) - } + mode: ThrottleMode): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.throttle(elements, per.asScala, maximumBurst, mode)) + + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(cost: Int, per: java.time.Duration, + costCalculation: function.Function[Out, Integer]): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.throttle(cost, per.asScala, costCalculation.apply _)) /** * Sends elements downstream with speed limited to `cost/per`. Cost is @@ -1779,10 +1827,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1792,7 +1841,6 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ @Deprecated @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") @@ -1824,10 +1872,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -1837,13 +1886,10 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(cost: Int, per: java.time.Duration, maximumBurst: Int, - costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttle(cost, per.asScala, maximumBurst, costCalculation, mode) - } + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = + new SubSource(delegate.throttle(cost, per.asScala, maximumBurst, costCalculation.apply _, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -1856,24 +1902,9 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = - new SubSource(delegate.throttle(elements, per, Int.MaxValue, mode)) - - /** - * This is a simplified version of throttle that spreads events evenly across the given time interval. - * - * Use this combinator when you need just slow down a stream without worrying about exact amount - * of time between events. - * - * If you want to be sure that no time interval has no more than specified number of events you need to use - * [[throttle()]] with maximumBurst attribute. - * @see [[#throttle]] - */ - def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ - throttleEven(elements, per.asScala, mode) - } + new SubSource(delegate.throttleEven(elements, per, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -1886,10 +1917,25 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * @see [[#throttle]] */ @Deprecated - @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") + def throttleEven(elements: Int, per: java.time.Duration, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = + throttleEven(elements, per.asScala, mode) + + /** + * This is a simplified version of throttle that spreads events evenly across the given time interval. + * + * Use this combinator when you need just slow down a stream without worrying about exact amount + * of time between events. + * + * If you want to be sure that no time interval has no more than specified number of events you need to use + * [[throttle()]] with maximumBurst attribute. + * @see [[#throttle]] + */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = - new SubSource(delegate.throttle(cost, per, Int.MaxValue, costCalculation.apply _, mode)) + new SubSource(delegate.throttleEven(cost, per, costCalculation.apply _, mode)) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -1901,11 +1947,11 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: java.time.Duration, - costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): javadsl.SubSource[Out, Mat] = throttleEven(cost, per.asScala, costCalculation, mode) - } /** * Detaches upstream demand from downstream demand without detaching the @@ -1948,10 +1994,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels */ - def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] = { - import akka.util.JavaDurationConverters._ + def initialDelay(delay: java.time.Duration): SubSource[Out, Mat] = initialDelay(delay.asScala) - } /** * Change the attributes of this [[Source]] to the given ones and seal the list diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7f6313bc1d..c8b93cca6b 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2047,6 +2047,38 @@ trait FlowOps[+Out, +Mat] { def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () ⇒ U): Repr[U] = via(new Timers.IdleInject[Out, U](maxIdle, injectedElem)) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(elements: Int, per: FiniteDuration): Repr[Out] = + throttle(elements, per, maximumBurst = Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, ThrottleMode.Shaping) + /** * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate * for emitting messages. This combinator works for streams where all elements have the same cost or length. @@ -2069,10 +2101,11 @@ trait FlowOps[+Out, +Mat] { * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2082,11 +2115,45 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] = throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode) + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstiness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element costs. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. Bucket is full when stream just materialized and + * started. + * + * The burst size is calculated based on the given rate (`cost/per`) as 0.1 * rate, for example: + * - rate < 20/second => burst size 1 + * - rate 20/second => burst size 2 + * - rate 100/second => burst size 10 + * - rate 200/second => burst size 20 + * + * The throttle `mode` is [[akka.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to + * meet throttle rate. + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + */ + def throttle(cost: Int, per: FiniteDuration, costCalculation: (Out) ⇒ Int): Repr[Out] = + via(new Throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, ThrottleMode.Shaping)) + /** * Sends elements downstream with speed limited to `cost/per`. Cost is * calculating for each element individually by calling `calculateCost` function. @@ -2111,10 +2178,11 @@ trait FlowOps[+Out, +Mat] { * * WARNING: Be aware that throttle is using scheduler to slow down the stream. This scheduler has minimal time of triggering * next push. Consequently it will slow down the stream as it has minimal pause for emitting. This can happen in - * case burst is 0 and speed is higher than 30 events per second. You need to consider another solution in case you are expecting - * events being evenly spread with some small interval (30 milliseconds or less). - * In other words the throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it - * enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size. + * case burst is 0 and speed is higher than 30 events per second. You need to increase the `maximumBurst` if + * elements arrive with small interval (30 milliseconds or less). Use the overloaded `throttle` method without + * `maximumBurst` parameter to automatically calculate the `maximumBurst` based on the given rate (`cost/per`). + * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in + * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * * '''Emits when''' upstream emits an element and configured time per each element elapsed * @@ -2124,7 +2192,6 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * @see [[#throttleEven]] */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out] = @@ -2141,8 +2208,10 @@ trait FlowOps[+Out, +Mat] { * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(elements: Int, per: FiniteDuration, mode: ThrottleMode): Repr[Out] = - throttle(elements, per, Int.MaxValue, ConstantFun.oneInt, mode) + throttle(elements, per, Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, mode) /** * This is a simplified version of throttle that spreads events evenly across the given time interval. @@ -2154,9 +2223,11 @@ trait FlowOps[+Out, +Mat] { * [[throttle()]] with maximumBurst attribute. * @see [[#throttle]] */ + @Deprecated + @deprecated("Use throttle without `maximumBurst` parameter instead.", "2.5.12") def throttleEven(cost: Int, per: FiniteDuration, costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out] = - via(new Throttle(cost, per, Int.MaxValue, costCalculation, mode)) + throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, mode) /** * Detaches upstream demand from downstream demand without detaching the