+str #18555 add throttle combinator

This commit is contained in:
Alexander Golubev 2015-11-08 19:27:03 -05:00
parent 89461c2b2a
commit ea6488c6c0
7 changed files with 536 additions and 3 deletions

View file

@ -0,0 +1,245 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.ThrottleMode.{ Shaping, Enforcing }
import akka.stream._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NoStackTrace
class FlowThrottleSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1))
def genByteString(length: Int) =
ByteString(new Random().shuffle(0 to 255).take(length).map(_.toByte).toArray)
"Throttle for single cost elements" must {
"work for the happy case" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.millis, 0, Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
}
"emit single element per tick" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink(downstream))
downstream.request(20)
upstream.sendNext(1)
downstream.expectNoMsg(150.millis)
downstream.expectNext(1)
upstream.sendNext(2)
downstream.expectNoMsg(150.millis)
downstream.expectNext(2)
upstream.sendComplete()
downstream.expectComplete()
}
"not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(1, 300.millis, 0, Shaping).runWith(Sink(downstream))
downstream.request(2)
upstream.sendNext(1)
downstream.expectNext(1)
downstream.expectNoMsg(300.millis)
upstream.sendNext(2)
downstream.expectNext(2)
upstream.sendComplete()
}
"cancel when downstream cancels" in Utils.assertAllStagesStopped {
val downstream = TestSubscriber.probe[Int]()
Source(1 to 10).throttle(1, 300.millis, 0, Shaping).runWith(Sink(downstream))
downstream.cancel()
}
"send elements downstream as soon as time comes" in Utils.assertAllStagesStopped {
val probe = Source(1 to 10).throttle(2, 500.millis, 0, Shaping).runWith(TestSink.probe[Int])
.request(5)
probe.receiveWithin(600.millis) should be(Seq(1, 2))
probe.expectNoMsg(100.millis)
.expectNext(3)
.expectNoMsg(100.millis)
.expectNext(4)
.cancel()
}
"burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink(downstream))
downstream.request(1)
upstream.sendNext(1)
downstream.expectNoMsg(100.millis)
downstream.expectNext(1)
downstream.request(5)
downstream.expectNoMsg(1200.millis)
for (i 2 to 6) upstream.sendNext(i)
downstream.receiveWithin(300.millis, 5) should be(2 to 6)
downstream.cancel()
}
"burst some elements if have enough time" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(1, 200.millis, 5, Shaping).runWith(Sink(downstream))
downstream.request(1)
upstream.sendNext(1)
downstream.expectNoMsg(100.millis)
downstream.expectNext(1)
downstream.expectNoMsg(500.millis) //wait to receive 2 in burst afterwards
downstream.request(5)
for (i 2 to 4) upstream.sendNext(i)
downstream.receiveWithin(100.millis, 2) should be(Seq(2, 3))
downstream.cancel()
}
"throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped {
an[RateExceededException] shouldBe thrownBy {
Await.result(
Source(1 to 5).throttle(1, 200.millis, 5, Enforcing).runWith(Sink.ignore),
2.seconds)
}
}
"properly combine shape and throttle modes" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.millis, 5, Shaping)
.throttle(1, 100.millis, 5, Enforcing)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
}
}
"Throttle for various cost elements" must {
"work for happy case" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(1, 100.millis, 0, (_) 1, Shaping)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
}
"emit elements according to cost" in Utils.assertAllStagesStopped {
val list = (1 to 4).map(_ * 2).map(genByteString)
Source(list).throttle(2, 200.millis, 0, _.length, Shaping)
.runWith(TestSink.probe[ByteString])
.request(4)
.expectNext(list(0))
.expectNoMsg(300.millis)
.expectNext(list(1))
.expectNoMsg(500.millis)
.expectNext(list(2))
.expectNoMsg(700.millis)
.expectNext(list(3))
.expectComplete()
}
"not send downstream if upstream does not emit element" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(2, 300.millis, 0, identity, Shaping).runWith(Sink(downstream))
downstream.request(2)
upstream.sendNext(1)
downstream.expectNext(1)
downstream.expectNoMsg(300.millis)
upstream.sendNext(2)
downstream.expectNext(2)
upstream.sendComplete()
}
"cancel when downstream cancels" in Utils.assertAllStagesStopped {
val downstream = TestSubscriber.probe[Int]()
Source(1 to 10).throttle(2, 200.millis, 0, identity, Shaping).runWith(Sink(downstream))
downstream.cancel()
}
"send elements downstream as soon as time comes" in Utils.assertAllStagesStopped {
val probe = Source(1 to 10).throttle(4, 500.millis, 0, _ 2, Shaping).runWith(TestSink.probe[Int])
.request(5)
probe.receiveWithin(600.millis) should be(Seq(1, 2))
probe.expectNoMsg(100.millis)
.expectNext(3)
.expectNoMsg(100.millis)
.expectNext(4)
.cancel()
}
"burst according to its maximum if enough time passed" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(2, 400.millis, 5, (_) 1, Shaping).runWith(Sink(downstream))
downstream.request(1)
upstream.sendNext(1)
downstream.expectNoMsg(100.millis)
downstream.expectNext(1)
downstream.request(5)
downstream.expectNoMsg(1200.millis)
for (i 2 to 6) upstream.sendNext(i)
downstream.receiveWithin(300.millis, 5) should be(2 to 6)
downstream.cancel()
}
"burst some elements if have enough time" in Utils.assertAllStagesStopped {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).throttle(2, 400.millis, 5, (e) if (e < 4) 1 else 20, Shaping).runWith(Sink(downstream))
downstream.request(1)
upstream.sendNext(1)
downstream.expectNoMsg(100.millis)
downstream.expectNext(1)
downstream.expectNoMsg(500.millis) //wait to receive 2 in burst afterwards
downstream.request(5)
for (i 2 to 4) upstream.sendNext(i)
downstream.receiveWithin(200.millis, 2) should be(Seq(2, 3))
downstream.cancel()
}
"throw exception when exceeding throughtput in enforced mode" in Utils.assertAllStagesStopped {
an[RateExceededException] shouldBe thrownBy {
Await.result(
Source(1 to 5).throttle(2, 200.millis, 0, identity, Enforcing).runWith(Sink.ignore),
2.seconds)
}
}
"properly combine shape and throttle modes" in Utils.assertAllStagesStopped {
Source(1 to 5).throttle(2, 200.millis, 0, identity, Shaping)
.throttle(1, 100.millis, 5, Enforcing)
.runWith(TestSink.probe[Int])
.request(5)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
}
"handle rate calculation function exception" in Utils.assertAllStagesStopped {
val ex = new RuntimeException with NoStackTrace
Source(1 to 5).throttle(2, 200.millis, 0, (_) { throw ex }, Shaping)
.throttle(1, 100.millis, 5, Enforcing)
.runWith(TestSink.probe[Int])
.request(5)
.expectError(ex)
}
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
/**
* Represents a mode that decides how to deal exceed rate for Throttle combinator
*/
sealed abstract class ThrottleMode
object ThrottleMode {
/**
* Tells throttle to make pauses before emitting messages to meet throttle rate
*/
case object Shaping extends ThrottleMode
/**
* Makes throttle fail with exception when upstream is faster than throttle rate
*/
case object Enforcing extends ThrottleMode
}
/**
* Exception that is thrown when rated controlled by stream is exceeded
*/
class RateExceededException(msg: String) extends RuntimeException(msg)

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.ThrottleMode.{ Enforcing, Shaping }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage._
import akka.stream._
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[stream] class Throttle[T](cost: Int,
per: FiniteDuration,
maximumBurst: Int,
costCalculation: (T) Int,
mode: ThrottleMode)
extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
var willStop = false
var lastTokens: Long = 0
var previousTime: Long = 0
val speed = ((cost.toDouble / per.toMillis) * 1024 * 1024).toLong
val timerName: String = "ThrottleTimer"
var currentElement: Option[T] = None
setHandler(in, new InHandler {
val scaledMaximumBurst = scale(maximumBurst)
override def onUpstreamFinish(): Unit =
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
else completeStage()
override def onPush(): Unit = {
val timeElapsed = now() - previousTime
val currentTokens = Math.min(timeElapsed * speed + lastTokens, scaledMaximumBurst)
val elem = grab(in)
val elementCost = scale(costCalculation(elem))
if (currentTokens < elementCost)
mode match {
case Shaping
currentElement = Some(elem)
scheduleOnce(timerName, ((elementCost - currentTokens) / speed).millis)
case Enforcing failStage(new RateExceededException("Maximum throttle throughput exceeded"))
}
else {
lastTokens = currentTokens - elementCost
previousTime = now()
push(out, elem)
}
}
})
override protected def onTimer(key: Any): Unit = {
push(out, currentElement.get)
currentElement = None
previousTime = now()
lastTokens = 0
if (willStop) completeStage()
}
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
override def preStart(): Unit = previousTime = now()
private def now(): Long = System.currentTimeMillis()
private def scale(e: Int): Long = e.toLong << 20
}
override def toString = "Throttle"
}

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.{ TimeUnit, TimeoutException } import java.util.concurrent.{ TimeUnit, TimeoutException }

View file

@ -1063,6 +1063,62 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] = def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] =
new Flow(delegate.keepAlive(maxIdle, () injectedElem.create())) new Flow(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(elements, per, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode))
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *

View file

@ -921,6 +921,62 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] = def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] =
new Source(delegate.keepAlive(maxIdle, () injectedElem.create())) new Source(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(elements, per, maximumBurst, mode))
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] =
new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode))
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *

View file

@ -8,9 +8,9 @@ import akka.stream.Attributes._
import akka.stream._ import akka.stream._
import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } import akka.stream.impl._
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers } import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUnordered, TakeWithin }
import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue }
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -1075,6 +1075,70 @@ trait FlowOps[+Out, +Mat] {
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () U): Repr[U, Mat] = def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () U): Repr[U, Mat] =
via(new Timers.IdleInject[Out, U](maxIdle, injectedElem)) via(new Timers.IdleInject[Out, U](maxIdle, injectedElem))
/**
* Sends elements downstream with speed limited to `elements/per`. In other words, this stage set the maximum rate
* for emitting messages. This combinator works for streams where all elements have the same cost or length.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as number of elements. If there isn't any, throttle waits until the
* bucket accumulates enough tokens.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int,
mode: ThrottleMode): Repr[Out, Mat] = {
require(elements > 0, "elements must be > 0")
require(per.toMillis > 0, "per time must be > 0")
require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode")
via(new Throttle(elements, per, maximumBurst, _ 1, mode))
}
/**
* Sends elements downstream with speed limited to `cost/per`. Cost is
* calculating for each element individually by calling `calculateCost` function.
* This combinator works for streams when elements have different cost(length).
* Streams of `ByteString` for example.
*
* Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
* Tokens drops into the bucket at a given rate and can be `spared` for later use up to bucket capacity
* to allow some burstyness. Whenever stream wants to send an element, it takes as many
* tokens from the bucket as element cost. If there isn't any, throttle waits until the
* bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
* to their cost minus available tokens, meeting the target rate.
*
* Parameter `mode` manages behaviour when upstream is faster than throttle rate:
* - [[akka.stream.ThrottleMode.Shaping]] makes pauses before emitting messages to meet throttle rate
* - [[akka.stream.ThrottleMode.Enforcing]] fails with exception when upstream is faster than throttle rate. Enforcing
* cannot emit elements that cost more than the maximumBurst
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int,
costCalculation: (Out) Int, mode: ThrottleMode): Repr[Out, Mat] = {
require(per.toMillis > 0, "per time must be > 0")
require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode")
via(new Throttle(cost, per, maximumBurst, costCalculation, mode))
}
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *