diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala new file mode 100644 index 0000000000..792ff7efb4 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThrottleSpec.scala @@ -0,0 +1,245 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ThrottleMode.{ Shaping, Enforcing } +import akka.stream._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink +import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Random +import scala.util.control.NoStackTrace + +class FlowThrottleSpec extends AkkaSpec { + implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1)) + + def genByteString(length: Int) = + ByteString(new Random().shuffle(0 to 255).take(length).map(_.toByte).toArray) + + "Throttle for single cost elements" must { + "work for the happy case" in Utils.assertAllStagesStopped { + Source(1 to 5).throttle(1, 100.millis, 0, Shaping) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNext(1, 2, 3, 4, 5) + .expectComplete() + } + + "emit single element per tick" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink(downstream)) + + downstream.request(20) + upstream.sendNext(1) + downstream.expectNoMsg(150.millis) + downstream.expectNext(1) + + upstream.sendNext(2) + downstream.expectNoMsg(150.millis) + downstream.expectNext(2) + + upstream.sendComplete() + downstream.expectComplete() + } + + "not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink(downstream)) + + downstream.request(2) + upstream.sendNext(1) + downstream.expectNext(1) + + downstream.expectNoMsg(300.millis) + upstream.sendNext(2) + downstream.expectNext(2) + + upstream.sendComplete() + } + + "cancel when downstream cancels" in Utils.assertAllStagesStopped { + val downstream = TestSubscriber.probe[Int]() + Source(1 to 10).throttle(1, 300.millis, 0, Shaping).runWith(Sink(downstream)) + downstream.cancel() + } + + "send elements downstream as soon as time comes" in Utils.assertAllStagesStopped { + val probe = Source(1 to 10).throttle(2, 500.millis, 0, Shaping).runWith(TestSink.probe[Int]) + .request(5) + probe.receiveWithin(600.millis) should be(Seq(1, 2)) + probe.expectNoMsg(100.millis) + .expectNext(3) + .expectNoMsg(100.millis) + .expectNext(4) + .cancel() + } + + "burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink(downstream)) + downstream.request(1) + upstream.sendNext(1) + downstream.expectNoMsg(100.millis) + downstream.expectNext(1) + downstream.request(5) + downstream.expectNoMsg(1200.millis) + for (i ← 2 to 6) upstream.sendNext(i) + downstream.receiveWithin(300.millis, 5) should be(2 to 6) + downstream.cancel() + } + + "burst some elements if have enough time" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink(downstream)) + downstream.request(1) + upstream.sendNext(1) + downstream.expectNoMsg(100.millis) + downstream.expectNext(1) + downstream.expectNoMsg(500.millis) //wait to receive 2 in burst afterwards + downstream.request(5) + for (i ← 2 to 4) upstream.sendNext(i) + downstream.receiveWithin(100.millis, 2) should be(Seq(2, 3)) + downstream.cancel() + } + + "throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped { + an[RateExceededException] shouldBe thrownBy { + Await.result( + Source(1 to 5).throttle(1, 200.millis, 5, Enforcing).runWith(Sink.ignore), + 2.seconds) + } + } + + "properly combine shape and throttle modes" in Utils.assertAllStagesStopped { + Source(1 to 5).throttle(1, 100.millis, 5, Shaping) + .throttle(1, 100.millis, 5, Enforcing) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNext(1, 2, 3, 4, 5) + .expectComplete() + } + } + + "Throttle for various cost elements" must { + "work for happy case" in Utils.assertAllStagesStopped { + Source(1 to 5).throttle(1, 100.millis, 0, (_) ⇒ 1, Shaping) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNext(1, 2, 3, 4, 5) + .expectComplete() + } + + "emit elements according to cost" in Utils.assertAllStagesStopped { + val list = (1 to 4).map(_ * 2).map(genByteString) + Source(list).throttle(2, 200.millis, 0, _.length, Shaping) + .runWith(TestSink.probe[ByteString]) + .request(4) + .expectNext(list(0)) + .expectNoMsg(300.millis) + .expectNext(list(1)) + .expectNoMsg(500.millis) + .expectNext(list(2)) + .expectNoMsg(700.millis) + .expectNext(list(3)) + .expectComplete() + } + + "not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source(upstream).throttle(2, 300.millis, 0, identity, Shaping).runWith(Sink(downstream)) + + downstream.request(2) + upstream.sendNext(1) + downstream.expectNext(1) + + downstream.expectNoMsg(300.millis) + upstream.sendNext(2) + downstream.expectNext(2) + + upstream.sendComplete() + } + + "cancel when downstream cancels" in Utils.assertAllStagesStopped { + val downstream = TestSubscriber.probe[Int]() + Source(1 to 10).throttle(2, 200.millis, 0, identity, Shaping).runWith(Sink(downstream)) + downstream.cancel() + } + + "send elements downstream as soon as time comes" in Utils.assertAllStagesStopped { + val probe = Source(1 to 10).throttle(4, 500.millis, 0, _ ⇒ 2, Shaping).runWith(TestSink.probe[Int]) + .request(5) + probe.receiveWithin(600.millis) should be(Seq(1, 2)) + probe.expectNoMsg(100.millis) + .expectNext(3) + .expectNoMsg(100.millis) + .expectNext(4) + .cancel() + } + + "burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source(upstream).throttle(2, 400.millis, 5, (_) ⇒ 1, Shaping).runWith(Sink(downstream)) + downstream.request(1) + upstream.sendNext(1) + downstream.expectNoMsg(100.millis) + downstream.expectNext(1) + downstream.request(5) + downstream.expectNoMsg(1200.millis) + for (i ← 2 to 6) upstream.sendNext(i) + downstream.receiveWithin(300.millis, 5) should be(2 to 6) + downstream.cancel() + } + + "burst some elements if have enough time" in Utils.assertAllStagesStopped { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source(upstream).throttle(2, 400.millis, 5, (e) ⇒ if (e < 4) 1 else 20, Shaping).runWith(Sink(downstream)) + downstream.request(1) + upstream.sendNext(1) + downstream.expectNoMsg(100.millis) + downstream.expectNext(1) + downstream.expectNoMsg(500.millis) //wait to receive 2 in burst afterwards + downstream.request(5) + for (i ← 2 to 4) upstream.sendNext(i) + downstream.receiveWithin(200.millis, 2) should be(Seq(2, 3)) + downstream.cancel() + } + + "throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped { + an[RateExceededException] shouldBe thrownBy { + Await.result( + Source(1 to 5).throttle(2, 200.millis, 0, identity, Enforcing).runWith(Sink.ignore), + 2.seconds) + } + } + + "properly combine shape and throttle modes" in Utils.assertAllStagesStopped { + Source(1 to 5).throttle(2, 200.millis, 0, identity, Shaping) + .throttle(1, 100.millis, 5, Enforcing) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNext(1, 2, 3, 4, 5) + .expectComplete() + } + + "handle rate calculation function exception" in Utils.assertAllStagesStopped { + val ex = new RuntimeException with NoStackTrace + Source(1 to 5).throttle(2, 200.millis, 0, (_) ⇒ { throw ex }, Shaping) + .throttle(1, 100.millis, 5, Enforcing) + .runWith(TestSink.probe[Int]) + .request(5) + .expectError(ex) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala new file mode 100644 index 0000000000..f3c65023b7 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream + +/** + * Represents a mode that decides how to deal exceed rate for Throttle combinator + */ +sealed abstract class ThrottleMode + +object ThrottleMode { + + /** + * Tells throttle to make pauses before emitting messages to meet throttle rate + */ + case object Shaping extends ThrottleMode + + /** + * Makes throttle fail with exception when upstream is faster than throttle rate + */ + case object Enforcing extends ThrottleMode + +} + +/** + * Exception that is thrown when rated controlled by stream is exceeded + */ +class RateExceededException(msg: String) extends RuntimeException(msg) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala new file mode 100644 index 0000000000..797fff8650 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.ThrottleMode.{ Enforcing, Shaping } +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.stage._ +import akka.stream._ + +import scala.concurrent.duration.{ FiniteDuration, _ } +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +private[stream] class Throttle[T](cost: Int, + per: FiniteDuration, + maximumBurst: Int, + costCalculation: (T) ⇒ Int, + mode: ThrottleMode) + extends SimpleLinearGraphStage[T] { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + var willStop = false + var lastTokens: Long = 0 + var previousTime: Long = 0 + + val speed = ((cost.toDouble / per.toMillis) * 1024 * 1024).toLong + val timerName: String = "ThrottleTimer" + + var currentElement: Option[T] = None + + setHandler(in, new InHandler { + val scaledMaximumBurst = scale(maximumBurst) + + override def onUpstreamFinish(): Unit = + if (isAvailable(out) && isTimerActive(timerName)) willStop = true + else completeStage() + + override def onPush(): Unit = { + val timeElapsed = now() - previousTime + val currentTokens = Math.min(timeElapsed * speed + lastTokens, scaledMaximumBurst) + val elem = grab(in) + val elementCost = scale(costCalculation(elem)) + if (currentTokens < elementCost) + mode match { + case Shaping ⇒ + currentElement = Some(elem) + scheduleOnce(timerName, ((elementCost - currentTokens) / speed).millis) + case Enforcing ⇒ failStage(new RateExceededException("Maximum throttle throughput exceeded")) + } + else { + lastTokens = currentTokens - elementCost + previousTime = now() + push(out, elem) + } + } + }) + + override protected def onTimer(key: Any): Unit = { + push(out, currentElement.get) + currentElement = None + previousTime = now() + lastTokens = 0 + if (willStop) completeStage() + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + + override def preStart(): Unit = previousTime = now() + + private def now(): Long = System.currentTimeMillis() + + private def scale(e: Int): Long = e.toLong << 20 + } + + override def toString = "Throttle" +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index edb1904fcf..db97a5f3e3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ package akka.stream.impl import java.util.concurrent.{ TimeUnit, TimeoutException } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 4b939b5bdb..21886be5a9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1063,6 +1063,62 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] = new Flow(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstyness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as number of elements. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. + * + * Parameter `mode` manages behaviour when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, + mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(elements, per, maximumBurst, mode)) + + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstyness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. + * + * Parameter `mode` manages behaviour when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 95a7829146..04cb033fc2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -921,6 +921,62 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] = new Source(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstyness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as number of elements. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. + * + * Parameter `mode` manages behaviour when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, + mode: ThrottleMode): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(elements, per, maximumBurst, mode)) + + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstyness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. + * + * Parameter `mode` manages behaviour when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, + costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = + new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b5708b9308..f8e6dc0345 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -8,9 +8,9 @@ import akka.stream.Attributes._ import akka.stream._ import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } -import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers } -import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } +import akka.stream.impl._ +import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUnordered, TakeWithin } +import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -1075,6 +1075,70 @@ trait FlowOps[+Out, +Mat] { def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () ⇒ U): Repr[U, Mat] = via(new Timers.IdleInject[Out, U](maxIdle, injectedElem)) + /** + * Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate + * for emitting messages. This combinator works for streams where all elements have the same cost or length. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstyness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as number of elements. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. + * + * Parameter `mode` manages behaviour when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, + mode: ThrottleMode): Repr[Out, Mat] = { + require(elements > 0, "elements must be > 0") + require(per.toMillis > 0, "per time must be > 0") + require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") + via(new Throttle(elements, per, maximumBurst, _ ⇒ 1, mode)) + } + + /** + * Sends elements downstream with speed limited to `cost/per`. Cost is + * calculating for each element individually by calling `calculateCost` function. + * This combinator works for streams when elements have different cost(length). + * Streams of `ByteString` for example. + * + * Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). + * Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity + * to allow some burstyness. Whenever stream wants to send an element, it takes as many + * tokens from the bucket as element cost. If there isn't any, throttle waits until the + * bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally + * to their cost minus available tokens, meeting the target rate. + * + * Parameter `mode` manages behaviour when upstream is faster than throttle rate: + * - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate + * - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing + * cannot emit elements that cost more than the maximumBurst + * + * '''Emits when''' upstream emits an element and configured time per each element elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out, Mat] = { + require(per.toMillis > 0, "per time must be > 0") + require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") + via(new Throttle(cost, per, maximumBurst, costCalculation, mode)) + } + /** * Delays the initial element by the specified duration. *