diff --git a/akka-contrib/docs/index.rst b/akka-contrib/docs/index.rst index 9f5b57c513..5303c21c6d 100644 --- a/akka-contrib/docs/index.rst +++ b/akka-contrib/docs/index.rst @@ -29,6 +29,7 @@ The Current List of Modules .. toctree:: reliable-proxy + throttle Suggested Way of Using these Contributions ------------------------------------------ diff --git a/akka-contrib/docs/throttle.rst b/akka-contrib/docs/throttle.rst new file mode 100644 index 0000000000..ab60fb6b96 --- /dev/null +++ b/akka-contrib/docs/throttle.rst @@ -0,0 +1,60 @@ +Throttling Actor Messages +========================= + +Introduction +------------ + +Suppose you are writing an application that makes HTTP requests to an external +web service and that this web service has a restriction in place: you may not +make more than 10 requests in 1 minute. You will get blocked or need to pay if +you don’t stay under this limit. In such a scenario you will want to employ +a *message throttler*. + +This extension module provides a simple implementation of a throttling actor, +the :class:`TimerBasedThrottler`. + + +How to use it +------------- + +You can use a :class:`TimerBasedThrottler` as follows: + +.. includecode:: @contribSrc@/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala#demo-code + +Please refer to the ScalaDoc documentation for the details. + + +The guarantees +-------------- + +:class:`TimerBasedThrottler` uses a timer internally. When the throttler’s rate is 3 msg/s, +for example, the throttler will start a timer that triggers +every second and each time will give the throttler exactly three "vouchers"; +each voucher gives the throttler a right to deliver a message. In this way, +at most 3 messages will be sent out by the throttler in each interval. + +It should be noted that such timer-based throttlers provide relatively **weak guarantees**: + +* Only *start times* are taken into account. This may be a problem if, for example, the + throttler is used to throttle requests to an external web service. If a web request + takes very long on the server then the rate *observed on the server* may be higher. +* A timer-based throttler only makes guarantees for the intervals of its own timer. In + our example, no more than 3 messages are delivered within such intervals. Other + intervals on the timeline, however, may contain more calls. + +The two cases are illustrated in the two figures below, each showing a timeline and three +intervals of the timer. The message delivery times chosen by the throttler are indicated +by dots, and as you can see, each interval contains at most 3 point, so the throttler +works correctly. Still, there is in each example an interval (the red one) that is +problematic. In the first scenario, this is because the delivery times are merely the +start times of longer requests (indicated by the four bars above the timeline that start +at the dots), so that the server observes four requests during the red interval. In the +second scenario, the messages are centered around one of the points in time where the +timer triggers, causing the red interval to contain too many messages. + +.. image:: throttler.png + +For some application scenarios, the guarantees provided by a timer-based throttler might +be too weak. Charles Cordingley’s `blog post `_ +discusses a throttler with stronger guarantees (it solves problem 2 from above). +Future versions of this module may feature throttlers with better guarantees. \ No newline at end of file diff --git a/akka-contrib/docs/throttler.png b/akka-contrib/docs/throttler.png new file mode 100644 index 0000000000..eab1a52a34 Binary files /dev/null and b/akka-contrib/docs/throttler.png differ diff --git a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala new file mode 100644 index 0000000000..12e98e89b2 --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala @@ -0,0 +1,294 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.contrib.throttle + +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.util.control.NonFatal +import scala.collection.immutable.{ Queue ⇒ Q } +import akka.actor.{ ActorRef, Actor, FSM } +import Throttler._ +import TimerBasedThrottler._ +import java.util.concurrent.TimeUnit +import akka.AkkaException + +/** + * Marker trait for throttlers. + * + * == Throttling == + * A throttler is an actor that is defined through a target actor and a rate + * (of type [[akka.contrib.throttle.Throttler.Rate]]). You set or change the target and rate at any time through the `SetTarget(target)` + * and `SetRate(rate)` messages, respectively. When you send the throttler any other message `msg`, it will + * put the message `msg` into an internal queue and eventually send all queued messages to the target, at + * a speed that respects the given rate. If no target is currently defined then the messages will be queued + * and will be delivered as soon as a target gets set. + * + * A [[akka.contrib.throttle.Throttler]] understands actor messages of type + * [[akka.contrib.throttle.Throttler.SetTarget]], [[akka.contrib.throttle.Throttler.SetRate]], in + * addition to any other messages, which the throttler will consider as messages to be sent to + * the target. + * + * == Transparency == + * Notice that the throttler `forward`s messages, i.e., the target will see the original message sender (and not the throttler) as the sender of the message. + * + * == Persistence == + * Throttlers usually use an internal queue to keep the messages that need to be sent to the target. + * You therefore cannot rely on the throttler's inbox size in order to learn how much messages are + * outstanding. + * + * It is left to the implementation whether the internal queue is persisted over application restarts or + * actor failure. + * + * == Processing messages == + * The target should process messages as fast as possible. If the target requires substantial time to + * process messages, it should distribute its work to other actors (using for example something like + * a `BalancingDispatcher`), otherwise the resulting system will always work below + * the threshold rate. + * + * Example: Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message. + * This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s + * but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such + * a situation, the target should distribute its messages to a set of worker actors so that individual messages + * can be handled in parallel. + * + * @see [[akka.contrib.throttle.TimerBasedThrottler]] + */ +trait Throttler { self: Actor ⇒ } + +/** + * Message types understood by [[akka.contrib.throttle.Throttler]]'s. + * + * @see [[akka.contrib.throttle.Throttler]] + * @see [[akka.contrib.throttle.Throttler.Rate]] + */ +object Throttler { + /** + * A rate used for throttling. + * + * There are some shorthands available to construct rates: + * {{{ + * import java.util.concurrent.TimeUnit._ + * import scala.concurrent.duration.{ Duration, FiniteDuration } + * + * val rate1 = 1 msgsPer (1, SECONDS) + * val rate2 = 1 msgsPer Duration(1, SECONDS) + * val rate3 = 1 msgsPer (1 seconds) + * val rate4 = 1 msgsPerSecond + * val rate5 = 1 msgsPerMinute + * val rate6 = 1 msgsPerHour + * }}} + * + * @param numberOfCalls the number of calls that may take place in a period + * @param duration the length of the period + * @see [[akka.contrib.throttle.Throttler]] + */ + case class Rate(val numberOfCalls: Int, val duration: FiniteDuration) { + /** + * The duration in milliseconds. + */ + def durationInMillis(): Long = duration.toMillis + } + + /** + * Set the target of a [[akka.contrib.throttle.Throttler]]. + * + * You may change a throttler's target at any time. + * + * Notice that the messages sent by the throttler to the target will have the original sender (and + * not the throttler) as the sender. (In Akka terms, the throttler `forward`s the message.) + * + * @param target if `target` is `None`, the throttler will stop delivering messages and the messages already received + * but not yet delivered, as well as any messages received in the future will be queued + * and eventually be delivered when a new target is set. If `target` is not `None`, the currently queued messages + * as well as any messages received in the the future will be delivered to the new target at a rate not exceeding the current throttler's rate. + */ + case class SetTarget(target: Option[ActorRef]) + + /** + * Set the rate of a [[akka.contrib.throttle.Throttler]]. + * + * You may change a throttler's rate at any time. + * + * @param rate the rate at which messages will be delivered to the target of the throttler + */ + case class SetRate(rate: Rate) + + import language.implicitConversions + + /** + * Helper for some syntactic sugar. + * + * @see [[akka.contrib.throttle.Throttler.Rate]] + */ + implicit class RateInt(numberOfCalls: Int) { + def msgsPer(duration: Int, timeUnit: TimeUnit) = Rate(numberOfCalls, Duration(duration, timeUnit)) + def msgsPer(duration: FiniteDuration) = Rate(numberOfCalls, duration) + def msgsPerSecond = Rate(numberOfCalls, Duration(1, TimeUnit.SECONDS)) + def msgsPerMinute = Rate(numberOfCalls, Duration(1, TimeUnit.MINUTES)) + def msgsPerHour = Rate(numberOfCalls, Duration(1, TimeUnit.HOURS)) + } + +} + +/** + * Implementation-specific internals. + */ +object TimerBasedThrottler { + private[throttle] case object Tick + + // States of the FSM + private[throttle] sealed trait State + // Idle means we don't deliver messages, either because there are none, or because no target was set. + private[throttle] case object Idle extends State + // Active means we the target is set and we have a message queue that is non-empty. + private[throttle] case object Active extends State + + // Messages, as we queue them to be sent later + private[throttle] case class Message(message: Any, sender: ActorRef) + + // The data of the FSM + private[throttle] sealed case class Data(target: Option[ActorRef], + callsLeftInThisPeriod: Int, + queue: Q[Message]) +} + +/** + * A [[akka.contrib.throttle.Throttler]] that uses a timer to control the message delivery rate. + * + * ==Example== + * For example, if you set a rate like "3 messages in 1 second", the throttler + * will send the first three messages immediately to the target actor but will need to impose a delay before + * sending out further messages: + * {{{ + * // A simple actor that prints whatever it receives + * val printer = system.actorOf(Props(new Actor { + * def receive = { + * case x => println(x) + * } + * })) + * // The throttler for this example, setting the rate + * val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1.second)))) + * // Set the target + * throttler ! SetTarget(Some(printer)) + * // These three messages will be sent to the printer immediately + * throttler ! "1" + * throttler ! "2" + * throttler ! "3" + * // These two will wait at least until 1 second has passed + * throttler ! "4" + * throttler ! "5" + * }}} + * + * ==Implementation notes== + * This throttler implementation internally installs a timer that repeats every `rate.durationInMillis` and enables `rate.numberOfCalls` + * additional calls to take place. A `TimerBasedThrottler` uses very few system resources, provided the rate's duration is not too + * fine-grained (which would cause a lot of timer invocations); for example, it does not store the calling history + * as other throttlers may need to do. + * + * However, a `TimerBasedThrottler` only provides ''weak guarantees'' on the rate (see also + * this blog post): + * + * - Only ''delivery'' times are taken into account: if, for example, the throttler is used to throttle + * requests to an external web service then only the start times of the web requests are considered. + * If a web request takes very long on the server then more than `rate.numberOfCalls`-many requests + * may be observed on the server in an interval of duration `rate.durationInMillis()`. + * - There may be intervals of duration `rate.durationInMillis()` that contain more than `rate.numberOfCalls` + * message deliveries: a `TimerBasedThrottler` only makes guarantees for the intervals + * of its ''own'' timer, namely that no more than `rate.numberOfCalls`-many messages are delivered within such intervals. Other intervals on the + * timeline may contain more calls. + * + * For some applications, these guarantees may not be sufficient. + * + * ==Known issues== + * + * - If you change the rate using `SetRate(rate)`, the actual rate may in fact be higher for the + * overlapping period (i.e., `durationInMillis()`) of the new and old rate. Therefore, + * changing the rate frequently is not recommended with the current implementation. + * - The queue of messages to be delivered is not persisted in any way; actor or system failure will + * cause the queued messages to be lost. + * + * @see [[akka.contrib.throttle.Throttler]] + */ +class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[State, Data] { + + startWith(Idle, Data(None, rate.numberOfCalls, Q())) + + // Idle: no messages, or target not set + when(Idle) { + // Set the rate + case Event(SetRate(rate), d) ⇒ + this.rate = rate + stay using d.copy(callsLeftInThisPeriod = rate.numberOfCalls) + + // Set the target + case Event(SetTarget(t @ Some(_)), d) if !d.queue.isEmpty ⇒ + goto(Active) using deliverMessages(d.copy(target = t)) + case Event(SetTarget(t), d) ⇒ + stay using d.copy(target = t) + + // Queuing + case Event(msg, d @ Data(None, _, queue)) ⇒ + stay using d.copy(queue = queue.enqueue(Message(msg, context.sender))) + case Event(msg, d @ Data(Some(_), _, Seq())) ⇒ + goto(Active) using deliverMessages(d.copy(queue = Q(Message(msg, context.sender)))) + // Note: The case Event(msg, t @ Data(Some(_), _, _, Seq(_*))) should never happen here. + } + + when(Active) { + // Set the rate + case Event(SetRate(rate), d) ⇒ + this.rate = rate + // Note: this should be improved (see "Known issues" in class comments) + stopTimer() + startTimer(rate) + stay using d.copy(callsLeftInThisPeriod = rate.numberOfCalls) + + // Set the target (when the new target is None) + case Event(SetTarget(None), d) ⇒ + goto(Idle) using d.copy(target = None) + + // Set the target (when the new target is not None) + case Event(SetTarget(t @ Some(_)), d) ⇒ + stay using d.copy(target = t) + + // Period ends and we have no more messages + case Event(Tick, d @ Data(_, _, Seq())) ⇒ + goto(Idle) + + // Period ends and we get more occasions to send messages + case Event(Tick, d @ Data(_, _, _)) ⇒ + stay using deliverMessages(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)) + + // Queue a message (when we cannot send messages in the current period anymore) + case Event(msg, d @ Data(_, 0, queue)) ⇒ + stay using d.copy(queue = queue.enqueue(Message(msg, context.sender))) + + // Queue a message (when we can send some more messages in the current period) + case Event(msg, d @ Data(_, _, queue)) ⇒ + stay using deliverMessages(d.copy(queue = queue.enqueue(Message(msg, context.sender)))) + } + + onTransition { + case Idle -> Active ⇒ startTimer(rate) + case Active -> Idle ⇒ stopTimer() + } + + initialize + + private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true) + private def stopTimer() = cancelTimer("morePermits") + + /** + * Send as many messages as we can (while respecting the rate) to the target and + * return the state data (with the queue containing the remaining ones). + */ + private def deliverMessages(data: Data): Data = { + val queue = data.queue + val nrOfMsgToSend = scala.math.min(queue.length, data.callsLeftInThisPeriod) + + queue.take(nrOfMsgToSend).foreach(x ⇒ data.target.get.tell(x.message, x.sender)) + + data.copy(queue = queue.drop(nrOfMsgToSend), callsLeftInThisPeriod = data.callsLeftInThisPeriod - nrOfMsgToSend) + } +} \ No newline at end of file diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala new file mode 100644 index 0000000000..4f8f422cb6 --- /dev/null +++ b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.contrib.throttle + +import language.postfixOps +import scala.concurrent.duration._ +import akka.actor.ActorSystem +import akka.actor.Actor +import akka.actor.Props +import akka.testkit.TestKit +import akka.testkit.ImplicitSender +import akka.contrib.throttle.Throttler._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +object TimerBasedThrottlerSpec { + class EchoActor extends Actor { + def receive = { + case x ⇒ sender ! x + } + } +} + +@RunWith(classOf[JUnitRunner]) +class TimerBasedThrottlerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpec with MustMatchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("TimerBasedThrottlerSpec")) + + override def afterAll { + system.shutdown() + } + + "A throttler" must { + + "must pass the ScalaDoc class documentation example prgoram" in { + //#demo-code + // A simple actor that prints whatever it receives + val printer = system.actorOf(Props(new Actor { + def receive = { + case x ⇒ println(x) + } + })) + // The throttler for this example, setting the rate + val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) + // Set the target + throttler ! SetTarget(Some(printer)) + // These three messages will be sent to the echoer immediately + throttler ! "1" + throttler ! "2" + throttler ! "3" + // These two will wait until a second has passed + throttler ! "4" + throttler ! "5" + //#demo-code + } + + "keep messages until a target is set" in { + val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor]) + val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) + throttler ! "1" + throttler ! "2" + throttler ! "3" + throttler ! "4" + throttler ! "5" + throttler ! "6" + expectNoMsg(1 second) + throttler ! SetTarget(Some(echo)) + within(2.seconds) { + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectMsg("4") + expectMsg("5") + expectMsg("6") + } + } + + "respect the rate (3 msg/s)" in { + val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor]) + val throttler = system.actorOf(Props(new TimerBasedThrottler(3 msgsPer (1 second)))) + throttler ! SetTarget(Some(echo)) + throttler ! "1" + throttler ! "2" + throttler ! "3" + throttler ! "4" + throttler ! "5" + throttler ! "6" + throttler ! "7" + within(1 second) { + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("4") + expectMsg("5") + expectMsg("6") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("7") + } + } + + "respect the rate (4 msg/s)" in { + val echo = system.actorOf(Props[TimerBasedThrottlerSpec.EchoActor]) + val throttler = system.actorOf(Props(new TimerBasedThrottler(4 msgsPer (1 second)))) + throttler ! SetTarget(Some(echo)) + throttler ! "1" + throttler ! "2" + throttler ! "3" + throttler ! "4" + throttler ! "5" + throttler ! "6" + throttler ! "7" + throttler ! "8" + throttler ! "9" + within(1 second) { + expectMsg("1") + expectMsg("2") + expectMsg("3") + expectMsg("4") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("5") + expectMsg("6") + expectMsg("7") + expectMsg("8") + expectNoMsg(remaining) + } + within(1 second) { + expectMsg("9") + } + } + } +} \ No newline at end of file