From 0b2d3df67e2fea0935880b9650cb937d6c8233a0 Mon Sep 17 00:00:00 2001 From: "Kaspar Fischer (hbf)" Date: Wed, 31 Oct 2012 13:17:59 +0100 Subject: [PATCH] contribution module throttle --- akka-contrib/docs/index.rst | 1 + akka-contrib/docs/throttle.rst | 60 ++++ akka-contrib/docs/throttler.png | Bin 0 -> 4036 bytes .../throttle/TimerBasedThrottler.scala | 294 ++++++++++++++++++ .../throttle/TimerBasedThrottlerSpec.scala | 144 +++++++++ 5 files changed, 499 insertions(+) create mode 100644 akka-contrib/docs/throttle.rst create mode 100644 akka-contrib/docs/throttler.png create mode 100644 akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala create mode 100644 akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala diff --git a/akka-contrib/docs/index.rst b/akka-contrib/docs/index.rst index 9f5b57c513..5303c21c6d 100644 --- a/akka-contrib/docs/index.rst +++ b/akka-contrib/docs/index.rst @@ -29,6 +29,7 @@ The Current List of Modules .. toctree:: reliable-proxy + throttle Suggested Way of Using these Contributions ------------------------------------------ diff --git a/akka-contrib/docs/throttle.rst b/akka-contrib/docs/throttle.rst new file mode 100644 index 0000000000..ab60fb6b96 --- /dev/null +++ b/akka-contrib/docs/throttle.rst @@ -0,0 +1,60 @@ +Throttling Actor Messages +========================= + +Introduction +------------ + +Suppose you are writing an application that makes HTTP requests to an external +web service and that this web service has a restriction in place: you may not +make more than 10 requests in 1 minute. You will get blocked or need to pay if +you don’t stay under this limit. In such a scenario you will want to employ +a *message throttler*. + +This extension module provides a simple implementation of a throttling actor, +the :class:`TimerBasedThrottler`. + + +How to use it +------------- + +You can use a :class:`TimerBasedThrottler` as follows: + +.. includecode:: @contribSrc@/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala#demo-code + +Please refer to the ScalaDoc documentation for the details. + + +The guarantees +-------------- + +:class:`TimerBasedThrottler` uses a timer internally. When the throttler’s rate is 3 msg/s, +for example, the throttler will start a timer that triggers +every second and each time will give the throttler exactly three "vouchers"; +each voucher gives the throttler a right to deliver a message. In this way, +at most 3 messages will be sent out by the throttler in each interval. + +It should be noted that such timer-based throttlers provide relatively **weak guarantees**: + +* Only *start times* are taken into account. This may be a problem if, for example, the + throttler is used to throttle requests to an external web service. If a web request + takes very long on the server then the rate *observed on the server* may be higher. +* A timer-based throttler only makes guarantees for the intervals of its own timer. In + our example, no more than 3 messages are delivered within such intervals. Other + intervals on the timeline, however, may contain more calls. + +The two cases are illustrated in the two figures below, each showing a timeline and three +intervals of the timer. The message delivery times chosen by the throttler are indicated +by dots, and as you can see, each interval contains at most 3 point, so the throttler +works correctly. Still, there is in each example an interval (the red one) that is +problematic. In the first scenario, this is because the delivery times are merely the +start times of longer requests (indicated by the four bars above the timeline that start +at the dots), so that the server observes four requests during the red interval. In the +second scenario, the messages are centered around one of the points in time where the +timer triggers, causing the red interval to contain too many messages. + +.. image:: throttler.png + +For some application scenarios, the guarantees provided by a timer-based throttler might +be too weak. Charles Cordingley’s `blog post `_ +discusses a throttler with stronger guarantees (it solves problem 2 from above). +Future versions of this module may feature throttlers with better guarantees. \ No newline at end of file diff --git a/akka-contrib/docs/throttler.png b/akka-contrib/docs/throttler.png new file mode 100644 index 0000000000000000000000000000000000000000..eab1a52a34e7435e1e6cafcf3e6c511dab45f078 GIT binary patch literal 4036 zcmeAS@N?(olHy`uVBq!ia0y~yV6V=e zG&%VkzHsT1)8xie3?lpj*Pc5R{4na8>)4^_P?Q-cIpySu6rUOW2FZOa%+3FU*x07o za~MvJV{2oz;^FxdESQwl#>2z(i-+g;GX{pk>jUxw3M6c`&)d_*F{I+w+c})!BFCj1?w=QObu483&Z=y{wMbvXyQ!0_LrA&lb*EIz3J$In z9f8f z-E_m`HjUo$(xvSeMK6e6xcYO&`t|w}MqJ$7FB3~x8A@0k!WqD*ed)4gYI&cZefNL* z^l6FOu0t0-1qWVSP{UsQ@{y~Ra4cgiW9-7|eSLkm4ldZcckVeCs6GaeDW>9g@7`Ug zsQ7+|$a7qXpUw3tq2K zH#RQ*x2Mu?(kdPg2->+kRG?q0R=dfktMUwY4;J{{bfn~`Bq zo6Ik3rJ{RxU+wObrC0ZVj@g;i`cgkDF>zs5&Z&dV%kp!Qk``sXSsT6G=l+eQjEt9p zt`|POP}(w4YMy<4U2I55KtRBkU8UkhMfZN1FD)xwyK?2t&FTIb9v%@H8Hc9FU%P(& zd;E!BX>+sV$JTLh{YtsIN?B6Uf4<$@{hywno7>&h6EgMxw?nOoza8IXT-dsFsj97R z?$uSHMde42AHBP$a`XHLn>KCAjo!|-K1TB2$Kyqfa<)}1>-d&s2M13+nzXUXQUAdG z|B1z)7rfB&ZYWGXF2z0F``_12;j+#Dzg~}jy?F6jt;d$Rw_exmsQT^Jx|5irgh5wG2!;{@w?Yw`K2HE ziSO`~sq1#`+_}Cw_51Jl`?sH!lb6pozma@tN%i-4H+Q{#{W#e?PR{(W!LhzI>x?`F z@4qptf8Hk>c5r>Qbj_aC*-sDDy=qveeBOMy|LR?p`Vj$PQCgw8nwm51^XFMOo_53QGI{l;L@zampWak)F7CpGM)Ol8#&-``gRHs+0IG;Mz zMXoT&&-U)=)qD5Gn*aas@bD_HBE^yob^p{IM$_gMSX>E4fu2M_*bIqiHwLZ{p0`A^;j zQc=BUrx)&<|6q0adZ}jn?oP=yuV1g&wQTwF)5UjZs8v_22$2_HI_3ZjLHw;T-UZq8!|NHyy!}fn)3tT~)wQ*~ubzGMe0kolSFf_>?^~^*efxXlZm+|sUTfqJY!aKw8Ig7W zg4Xw$JuS!fgocJjoqm0Ny?5%rKcCNcY|XxUa0ajRnscQ##=*5)ZX~T-xiWC-_3a5| zW!rXMUpH~$#8+=FUc9KQt9$9spFcJIo|CUQz7&wv)DC@myXE6!{n$Mkckf-hCMGVv zT&cOa`DRuTPwdl!Ys3w*n8W@@UrSE9Uj2?Iz5a7=;VIqi-Tyu}?Om&|w|3F7I9r_) zJ=?c!ySD8U_p|fcqbn;P?Qwo8>m6Qc_VVA9hk@s_-{hTs`|kF`;JGW^y-!AjMRl#? z`@AnC_|%sp-=gQ-xf8Rmyi_H&_4)bv>1k9MWxbw5*!bJqNyq{{g3-^aqjzyIUIQ-aEWeoCFm{QKvx zf7L%ft)DdMSN(HGEq(577bZ*ypL+gO)$>!&w~sx^;W!ofofe z{5gJm_l>(}C4+t@aZi_yy(k*X7~9@@OZVf|iGt@QyefP5qzI|h2N{CRV3bM$q6l2i zKxzqm0uVXWS_L9TqyZRKKy>3Z@pVGrrPFh*%YE+eT)TGds*S$BzRU98?5+MDl6?F2 zZQuJhR;|*?%*@POd9t+j=clEoXBbWjTNjb|t2-(pVujS~q7MsRO3$7#V}(@p>ub7~ z&C4REf?lXYH!5>1al#PnSMq_Uy91-NcEGM^n<$ zZn1d?t)A<3{ajyPYGdQJzXsQis{TJEx;kOz@*8g>=DT*^x^?Tq)s|hya&9b;GD`pU z=JE0V(+%5pK}}N@_tl6AD>I&-Z$JOt)1`eU&Q6>+ZCmp1 zvf9aWtxCO;UcGvCtXFom?70&sI{Nyqna}s>nd^T2%9W7PZztvC+Jk~qYXz^FfBg9A zi^;pXy%ism?%mn>YB9h4pATnd8cV)>X`~wH{_V}p$9r~7n=(b_pZc!orKkCGv4LXWG)6|H;|)VC{h$rEOF zzKoAyt?j3c{f^$*QRpl?$FO$Sl+8*S9)EY5D=u8#zu3KRMYGYZITCJL*VYL~%Z^utR;*cCKDk}O_xaQ}l)Z~JYlqU?}U^;KPT4^y^#q?fZAj{^a@d;cGvA z{<`26u{P4Lu zmc>_YKE1z6{?@Hqe}0>Bq{OaO4*soo$|?W7y8pZt4-Pdk?Oks-;j-ud)h9Q+b>F&m zYmRBJ)wg$dS4$b)yBeC))7?G$&BKL#YhpJ)>y7sP^=w+qBy%-2w^J9tPh%-5DLFIC zGFUb;XJZHRZ1enU(Y`lt-V{>S)~j`y)>ovw?yYv%q#y5XqJ1qbEq6W44qJb3Tj}a| zw)0)Pzqsv6`nf4}ec8f2d$#4>+@zbpyiT+s~9 zi7(I1$++-YA$d~!+^IKSWX-#-a%A=TNt2fO%v*Cmv7^6#dg>Cs7ynH;XLfq-iGIH% zYSI+trJ3QPMov=Sr>XhQ(U6PUt2S}^^yT|!PB1Ax*7N!KdH(wRo*nDz^S{ouSjZk) zsG0Qk*7d#XuH3n^C-HBU?c$H0zWDfiOD|ac;?)6xfB5IT`>|U> T<$x{&0|SGntDnm{r-UW|g*xXE literal 0 HcmV?d00001 diff --git a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala new file mode 100644 index 0000000000..12e98e89b2 --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala @@ -0,0 +1,294 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.contrib.throttle + +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.util.control.NonFatal +import scala.collection.immutable.{ Queue ⇒ Q } +import akka.actor.{ ActorRef, Actor, FSM } +import Throttler._ +import TimerBasedThrottler._ +import java.util.concurrent.TimeUnit +import akka.AkkaException + +/** + * Marker trait for throttlers. + * + * == Throttling == + * A throttler is an actor that is defined through a target actor and a rate + * (of type [[akka.contrib.throttle.Throttler.Rate]]). You set or change the target and rate at any time through the `SetTarget(target)` + * and `SetRate(rate)` messages, respectively. When you send the throttler any other message `msg`, it will + * put the message `msg` into an internal queue and eventually send all queued messages to the target, at + * a speed that respects the given rate. If no target is currently defined then the messages will be queued + * and will be delivered as soon as a target gets set. + * + * A [[akka.contrib.throttle.Throttler]] understands actor messages of type + * [[akka.contrib.throttle.Throttler.SetTarget]], [[akka.contrib.throttle.Throttler.SetRate]], in + * addition to any other messages, which the throttler will consider as messages to be sent to + * the target. + * + * == Transparency == + * Notice that the throttler `forward`s messages, i.e., the target will see the original message sender (and not the throttler) as the sender of the message. + * + * == Persistence == + * Throttlers usually use an internal queue to keep the messages that need to be sent to the target. + * You therefore cannot rely on the throttler's inbox size in order to learn how much messages are + * outstanding. + * + * It is left to the implementation whether the internal queue is persisted over application restarts or + * actor failure. + * + * == Processing messages == + * The target should process messages as fast as possible. If the target requires substantial time to + * process messages, it should distribute its work to other actors (using for example something like + * a `BalancingDispatcher`), otherwise the resulting system will always work below + * the threshold rate. + * + * Example: Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message. + * This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s + * but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such + * a situation, the target should distribute its messages to a set of worker actors so that individual messages + * can be handled in parallel. + * + * @see [[akka.contrib.throttle.TimerBasedThrottler]] + */ +trait Throttler { self: Actor ⇒ } + +/** + * Message types understood by [[akka.contrib.throttle.Throttler]]'s. + * + * @see [[akka.contrib.throttle.Throttler]] + * @see [[akka.contrib.throttle.Throttler.Rate]] + */ +object Throttler { + /** + * A rate used for throttling. + * + * There are some shorthands available to construct rates: + * {{{ + * import java.util.concurrent.TimeUnit._ + * import scala.concurrent.duration.{ Duration, FiniteDuration } + * + * val rate1 = 1 msgsPer (1, SECONDS) + * val rate2 = 1 msgsPer Duration(1, SECONDS) + * val rate3 = 1 msgsPer (1 seconds) + * val rate4 = 1 msgsPerSecond + * val rate5 = 1 msgsPerMinute + * val rate6 = 1 msgsPerHour + * }}} + * + * @param numberOfCalls the number of calls that may take place in a period + * @param duration the length of the period + * @see [[akka.contrib.throttle.Throttler]] + */ + case class Rate(val numberOfCalls: Int, val duration: FiniteDuration) { + /** + * The duration in milliseconds. + */ + def durationInMillis(): Long = duration.toMillis + } + + /** + * Set the target of a [[akka.contrib.throttle.Throttler]]. + * + * You may change a throttler's target at any time. + * + * Notice that the messages sent by the throttler to the target will have the original sender (and + * not the throttler) as the sender. (In Akka terms, the throttler `forward`s the message.) + * + * @param target if `target` is `None`, the throttler will stop delivering messages and the messages already received + * but not yet delivered, as well as any messages received in the future will be queued + * and eventually be delivered when a new target is set. If `target` is not `None`, the currently queued messages + * as well as any messages received in the the future will be delivered to the new target at a rate not exceeding the current throttler's rate. + */ + case class SetTarget(target: Option[ActorRef]) + + /** + * Set the rate of a [[akka.contrib.throttle.Throttler]]. + * + * You may change a throttler's rate at any time. + * + * @param rate the rate at which messages will be delivered to the target of the throttler + */ + case class SetRate(rate: Rate) + + import language.implicitConversions + + /** + * Helper for some syntactic sugar. + * + * @see [[akka.contrib.throttle.Throttler.Rate]] + */ + implicit class RateInt(numberOfCalls: Int) { + def msgsPer(duration: Int, timeUnit: TimeUnit) = Rate(numberOfCalls, Duration(duration, timeUnit)) + def msgsPer(duration: FiniteDuration) = Rate(numberOfCalls, duration) + def msgsPerSecond = Rate(numberOfCalls, Duration(1, TimeUnit.SECONDS)) + def msgsPerMinute = Rate(numberOfCalls, Duration(1, TimeUnit.MINUTES)) + def msgsPerHour = Rate(numberOfCalls, Duration(1, TimeUnit.HOURS)) + } + +} + +/** + * Implementation-specific internals. + */ +object TimerBasedThrottler { + private[throttle] case object Tick + + // States of the FSM + private[throttle] sealed trait State + // Idle means we don't deliver messages, either because there are none, or because no target was set. + private[throttle] case object Idle extends State + // Active means we the target is set and we have a message queue that is non-empty. + private[throttle] case object Active extends State + + // Messages, as we queue them to be sent later + private[throttle] case class Message(message: Any, sender: ActorRef) + + // The data of the FSM + private[throttle] sealed case class Data(target: Option[ActorRef], + callsLeftInThisPeriod: Int, + queue: Q[Message]) +} + +/** + * A [[akka.contrib.throttle.Throttler]] that uses a timer to control the message delivery rate. + * + * ==Example== + * For example, if you set a rate like "3 messages in 1 second", the throttler + * will send the first three messages immediately to the target actor but will need to impose a delay before + * sending out further messages: + * {{{ + * // A simple actor that prints whatever it receives + * val printer = system.actorOf(Props(new Actor { + * def receive = { + * case x => println(x) + * } + * })) + * // The throttler for this example, setting the rate + * val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1.second)))) + * // Set the target + * throttler ! SetTarget(Some(printer)) + * // These three messages will be sent to the printer immediately + * throttler ! "1" + * throttler ! "2" + * throttler ! "3" + * // These two will wait at least until 1 second has passed + * throttler ! "4" + * throttler ! "5" + * }}} + * + * ==Implementation notes== + * This throttler implementation internally installs a timer that repeats every `rate.durationInMillis` and enables `rate.numberOfCalls` + * additional calls to take place. A `TimerBasedThrottler` uses very few system resources, provided the rate's duration is not too + * fine-grained (which would cause a lot of timer invocations); for example, it does not store the calling history + * as other throttlers may need to do. + * + * However, a `TimerBasedThrottler` only provides ''weak guarantees'' on the rate (see also + * this blog post): + * + * - Only ''delivery'' times are taken into account: if, for example, the throttler is used to throttle + * requests to an external web service then only the start times of the web requests are considered. + * If a web request takes very long on the server then more than `rate.numberOfCalls`-many requests + * may be observed on the server in an interval of duration `rate.durationInMillis()`. + * - There may be intervals of duration `rate.durationInMillis()` that contain more than `rate.numberOfCalls` + * message deliveries: a `TimerBasedThrottler` only makes guarantees for the intervals + * of its ''own'' timer, namely that no more than `rate.numberOfCalls`-many messages are delivered within such intervals. Other intervals on the + * timeline may contain more calls. + * + * For some applications, these guarantees may not be sufficient. + * + * ==Known issues== + * + * - If you change the rate using `SetRate(rate)`, the actual rate may in fact be higher for the + * overlapping period (i.e., `durationInMillis()`) of the new and old rate. Therefore, + * changing the rate frequently is not recommended with the current implementation. + * - The queue of messages to be delivered is not persisted in any way; actor or system failure will + * cause the queued messages to be lost. + * + * @see [[akka.contrib.throttle.Throttler]] + */ +class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[State, Data] { + + startWith(Idle, Data(None, rate.numberOfCalls, Q())) + + // Idle: no messages, or target not set + when(Idle) { + // Set the rate + case Event(SetRate(rate), d) ⇒ + this.rate = rate + stay using d.copy(callsLeftInThisPeriod = rate.numberOfCalls) + + // Set the target + case Event(SetTarget(t @ Some(_)), d) if !d.queue.isEmpty ⇒ + goto(Active) using deliverMessages(d.copy(target = t)) + case Event(SetTarget(t), d) ⇒ + stay using d.copy(target = t) + + // Queuing + case Event(msg, d @ Data(None, _, queue)) ⇒ + stay using d.copy(queue = queue.enqueue(Message(msg, context.sender))) + case Event(msg, d @ Data(Some(_), _, Seq())) ⇒ + goto(Active) using deliverMessages(d.copy(queue = Q(Message(msg, context.sender)))) + // Note: The case Event(msg, t @ Data(Some(_), _, _, Seq(_*))) should never happen here. + } + + when(Active) { + // Set the rate + case Event(SetRate(rate), d) ⇒ + this.rate = rate + // Note: this should be improved (see "Known issues" in class comments) + stopTimer() + startTimer(rate) + stay using d.copy(callsLeftInThisPeriod = rate.numberOfCalls) + + // Set the target (when the new target is None) + case Event(SetTarget(None), d) ⇒ + goto(Idle) using d.copy(target = None) + + // Set the target (when the new target is not None) + case Event(SetTarget(t @ Some(_)), d) ⇒ + stay using d.copy(target = t) + + // Period ends and we have no more messages + case Event(Tick, d @ Data(_, _, Seq())) ⇒ + goto(Idle) + + // Period ends and we get more occasions to send messages + case Event(Tick, d @ Data(_, _, _)) ⇒ + stay using deliverMessages(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)) + + // Queue a message (when we cannot send messages in the current period anymore) + case Event(msg, d @ Data(_, 0, queue)) ⇒ + stay using d.copy(queue = queue.enqueue(Message(msg, context.sender))) + + // Queue a message (when we can send some more messages in the current period) + case Event(msg, d @ Data(_, _, queue)) ⇒ + stay using deliverMessages(d.copy(queue = queue.enqueue(Message(msg, context.sender)))) + } + + onTransition { + case Idle -> Active ⇒ startTimer(rate) + case Active -> Idle ⇒ stopTimer() + } + + initialize + + private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true) + private def stopTimer() = cancelTimer("morePermits") + + /** + * Send as many messages as we can (while respecting the rate) to the target and + * return the state data (with the queue containing the remaining ones). + */ + private def deliverMessages(data: Data): Data = { + val queue = data.queue + val nrOfMsgToSend = scala.math.min(queue.length, data.callsLeftInThisPeriod) + + queue.take(nrOfMsgToSend).foreach(x ⇒ data.target.get.tell(x.message, x.sender)) + + data.copy(queue = queue.drop(nrOfMsgToSend), callsLeftInThisPeriod = data.callsLeftInThisPeriod - nrOfMsgToSend) + } +} \ No newline at end of file diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala new file mode 100644 index 0000000000..4f8f422cb6 --- /dev/null +++ b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.contrib.throttle + +import language.postfixOps +import scala.concurrent.duration._ +import akka.actor.ActorSystem +import akka.actor.Actor +import akka.actor.Props +import akka.testkit.TestKit +import akka.testkit.ImplicitSender +import akka.contrib.throttle.Throttler._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +object TimerBasedThrottlerSpec { + class EchoActor extends Actor { + def receive = { + case x ⇒ sender ! x + } + } +} + +@RunWith(classOf[JUnitRunner]) +class TimerBasedThrottlerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpec with MustMatchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("TimerBasedThrottlerSpec")) + + override def afterAll { + system.shutdown() + } + + "A throttler" must { + + "must pass the ScalaDoc class documentation example prgoram" in { + //#demo-code + // A simple actor that prints whatever it receives + val printer = system.actorOf(Props(new Actor { + def receive = { + case x ⇒ println(x) + } + })) + // The throttler for this example, setting the rate + val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) + // Set the target + throttler ! SetTarget(Some(printer)) + // These three messages will be sent to the echoer immediately + throttler ! "1" + throttler ! "2" + throttler ! "3" + // These two will wait until a second has passed + throttler ! "4" + throttler ! "5" + //#demo-code + } + + "keep messages until a target is set" in { + val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor]) + val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) + throttler ! "1" + throttler ! "2" + throttler ! "3" + throttler ! "4" + throttler ! "5" + throttler ! "6" + expectNoMsg(1 second) + throttler ! SetTarget(Some(echo)) + within(2.seconds) { + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectMsg("4") + expectMsg("5") + expectMsg("6") + } + } + + "respect the rate (3 msg/s)" in { + val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor]) + val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) + throttler ! SetTarget(Some(echo)) + throttler ! "1" + throttler ! "2" + throttler ! "3" + throttler ! "4" + throttler ! "5" + throttler ! "6" + throttler ! "7" + within(1 second) { + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("4") + expectMsg("5") + expectMsg("6") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("7") + } + } + + "respect the rate (4 msg/s)" in { + val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor]) + val throttler = system.actorOf(Props(new TimerBasedThrottler(4 msgsPer (1 second)))) + throttler ! SetTarget(Some(echo)) + throttler ! "1" + throttler ! "2" + throttler ! "3" + throttler ! "4" + throttler ! "5" + throttler ! "6" + throttler ! "7" + throttler ! "8" + throttler ! "9" + within(1 second) { + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectMsg("4") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("5") + expectMsg("6") + expectMsg("7") + expectMsg("8") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("9") + } + } + } +} \ No newline at end of file