2011-02-01 14:56:34 -08:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.routing
|
|
|
|
|
|
2011-03-02 00:14:45 +01:00
|
|
|
import akka.actor.{Actor, ActorRef, ErrorHandler, ErrorHandlerEvent}
|
2011-02-01 14:56:34 -08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Actor pooling
|
|
|
|
|
*
|
|
|
|
|
* An actor pool is an message router for a set of delegate actors. The pool is an actor itself.
|
|
|
|
|
* There are a handful of basic concepts that need to be understood when working with and defining your pool.
|
|
|
|
|
*
|
|
|
|
|
* Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message.
|
|
|
|
|
* Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types.
|
2011-02-28 22:55:02 +01:00
|
|
|
* The first determines the size itself - either fixed or bounded.
|
|
|
|
|
* The second determines how to adjust of the pool according to some internal pressure characteristic.
|
2011-02-01 14:56:34 -08:00
|
|
|
* Filters - A filter can be used to refine the raw pressure value returned from a capacitor.
|
|
|
|
|
*
|
|
|
|
|
* It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say
|
|
|
|
|
* that one couldn't instance different classes within the pool, only that the pool, when selecting and routing,
|
|
|
|
|
* will not take any type information into consideration.
|
|
|
|
|
*
|
|
|
|
|
* @author Garrick Evans
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
object ActorPool
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
case object Stat
|
|
|
|
|
case class Stats(size:Int)
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Defines the nature of an actor pool.
|
|
|
|
|
*/
|
|
|
|
|
trait ActorPool
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
def instance():ActorRef
|
|
|
|
|
def capacity(delegates:Seq[ActorRef]):Int
|
|
|
|
|
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int]
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A default implementation of a pool, on each message to route,
|
2011-02-28 22:55:02 +01:00
|
|
|
* - checks the current capacity and adjusts accordingly if needed
|
|
|
|
|
* - routes the incoming message to a selection set of delegate actors
|
2011-02-01 14:56:34 -08:00
|
|
|
*/
|
|
|
|
|
trait DefaultActorPool extends ActorPool
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
this: Actor =>
|
|
|
|
|
|
|
|
|
|
import ActorPool._
|
|
|
|
|
import collection.mutable.LinkedList
|
|
|
|
|
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected var _delegates = LinkedList[ActorRef]()
|
|
|
|
|
private var _lastCapacityChange = 0
|
|
|
|
|
private var _lastSelectorCount = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
override def postStop = _delegates foreach {_ stop}
|
|
|
|
|
|
|
|
|
|
protected def _route:Receive =
|
|
|
|
|
{
|
|
|
|
|
//
|
|
|
|
|
// for testing...
|
|
|
|
|
//
|
|
|
|
|
case Stat =>
|
|
|
|
|
self reply_? Stats(_delegates length)
|
|
|
|
|
|
|
|
|
|
case max:MaximumNumberOfRestartsWithinTimeRangeReached =>
|
|
|
|
|
_delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)}
|
|
|
|
|
|
|
|
|
|
case msg =>
|
|
|
|
|
_capacity
|
|
|
|
|
_select foreach {delegate =>
|
|
|
|
|
self.senderFuture match {
|
|
|
|
|
case None => delegate ! msg
|
|
|
|
|
case Some(future) =>
|
|
|
|
|
Actor.spawn {
|
|
|
|
|
try {
|
|
|
|
|
future completeWithResult (delegate !! msg).getOrElse(None)
|
|
|
|
|
} catch {
|
2011-03-02 00:14:45 +01:00
|
|
|
case e =>
|
|
|
|
|
ErrorHandler notifyListeners ErrorHandlerEvent(e, this)
|
|
|
|
|
future completeWithException e
|
2011-02-28 22:55:02 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def _capacity =
|
|
|
|
|
{
|
|
|
|
|
_lastCapacityChange = capacity(_delegates)
|
|
|
|
|
if (_lastCapacityChange > 0) {
|
|
|
|
|
_delegates ++= {
|
|
|
|
|
for (i <- 0 until _lastCapacityChange) yield {
|
|
|
|
|
val delegate = instance()
|
|
|
|
|
self startLink delegate
|
|
|
|
|
delegate
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (_lastCapacityChange < 0) {
|
|
|
|
|
val s = _delegates splitAt(_delegates.length + _lastCapacityChange)
|
|
|
|
|
s._2 foreach {_ stop}
|
|
|
|
|
_delegates = s._1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def _select =
|
|
|
|
|
{
|
|
|
|
|
val s = select(_delegates)
|
|
|
|
|
_lastSelectorCount = s._2
|
|
|
|
|
s._1
|
|
|
|
|
}
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Selectors
|
2011-02-28 22:55:02 +01:00
|
|
|
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
|
2011-02-01 14:56:34 -08:00
|
|
|
**/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the set of delegates with the least amount of message backlog.
|
|
|
|
|
*/
|
|
|
|
|
trait SmallestMailboxSelector
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
def selectionCount:Int
|
|
|
|
|
def partialFill:Boolean
|
|
|
|
|
|
|
|
|
|
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
|
|
|
|
|
{
|
|
|
|
|
var set:Seq[ActorRef] = Nil
|
|
|
|
|
var take = {
|
|
|
|
|
if (partialFill) math.min(selectionCount, delegates.length)
|
|
|
|
|
else selectionCount
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (take > 0) {
|
|
|
|
|
set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set
|
|
|
|
|
take -= set.size
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(set.iterator, set.size)
|
|
|
|
|
}
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection
|
|
|
|
|
*/
|
|
|
|
|
trait RoundRobinSelector
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
private var _last:Int = -1;
|
|
|
|
|
|
|
|
|
|
def selectionCount:Int
|
|
|
|
|
def partialFill:Boolean
|
|
|
|
|
|
|
|
|
|
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
|
|
|
|
|
{
|
|
|
|
|
val length = delegates.length
|
|
|
|
|
val take = {
|
|
|
|
|
if (partialFill) math.min(selectionCount, length)
|
|
|
|
|
else selectionCount
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var set = for (i <- 0 to take) yield {
|
|
|
|
|
_last += 1
|
|
|
|
|
if (_last >= length) _last = 0
|
|
|
|
|
delegates(_last)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(set.iterator, set.size)
|
|
|
|
|
}
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Capacitors
|
2011-02-28 22:55:02 +01:00
|
|
|
* These traits define how to alter the size of the pool
|
2011-02-01 14:56:34 -08:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Ensures a fixed number of delegates in the pool
|
|
|
|
|
*/
|
|
|
|
|
trait FixedSizeCapacitor
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
def limit:Int
|
|
|
|
|
|
|
|
|
|
def capacity(delegates:Seq[ActorRef]):Int =
|
|
|
|
|
{
|
|
|
|
|
val d = limit - delegates.size
|
|
|
|
|
if (d>0) d
|
|
|
|
|
else 0
|
|
|
|
|
}
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Constrains the pool capacity to a bounded range
|
|
|
|
|
*/
|
|
|
|
|
trait BoundedCapacitor
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
def lowerBound:Int
|
|
|
|
|
def upperBound:Int
|
|
|
|
|
|
|
|
|
|
def capacity(delegates:Seq[ActorRef]):Int =
|
|
|
|
|
{
|
|
|
|
|
val current = delegates length
|
|
|
|
|
var delta = _eval(delegates)
|
|
|
|
|
val proposed = current + delta
|
|
|
|
|
|
|
|
|
|
if (proposed < lowerBound) delta += (lowerBound - proposed)
|
|
|
|
|
else if (proposed > upperBound) delta -= (proposed - upperBound)
|
|
|
|
|
|
|
|
|
|
delta
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def _eval(delegates:Seq[ActorRef]):Int
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the number of delegates required to manage the current message backlogs
|
|
|
|
|
*/
|
|
|
|
|
trait MailboxPressureCapacitor
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
def pressureThreshold:Int
|
|
|
|
|
|
|
|
|
|
def pressure(delegates:Seq[ActorRef]):Int =
|
|
|
|
|
{
|
|
|
|
|
var n = 0;
|
|
|
|
|
delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1}
|
|
|
|
|
n
|
|
|
|
|
}
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the number of delegates required to respond to the number of pending futures
|
|
|
|
|
*/
|
|
|
|
|
trait ActiveFuturesPressureCapacitor
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
def pressure(delegates:Seq[ActorRef]):Int =
|
|
|
|
|
{
|
|
|
|
|
var n = 0;
|
|
|
|
|
delegates foreach {d => if (d.senderFuture.isDefined) n+=1}
|
|
|
|
|
n
|
|
|
|
|
}
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*/
|
|
|
|
|
trait CapacityStrategy
|
|
|
|
|
{
|
2011-02-28 22:55:02 +01:00
|
|
|
import ActorPool._
|
|
|
|
|
|
|
|
|
|
def pressure(delegates:Seq[ActorRef]):Int
|
|
|
|
|
def filter(pressure:Int, capacity:Int):Int
|
|
|
|
|
|
|
|
|
|
protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size)
|
2011-02-01 14:56:34 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait FixedCapacityStrategy extends FixedSizeCapacitor
|
|
|
|
|
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
|
2011-02-15 15:58:43 -08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Filters
|
|
|
|
|
* These traits refine the raw pressure reading into a more appropriate capacity delta.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The basic filter trait that composes ramp-up and and back-off subfiltering.
|
|
|
|
|
*/
|
|
|
|
|
trait Filter
|
|
|
|
|
{
|
|
|
|
|
def rampup(pressure:Int, capacity:Int):Int
|
|
|
|
|
def backoff(pressure:Int, capacity:Int):Int
|
|
|
|
|
|
|
|
|
|
def filter(pressure:Int, capacity:Int):Int =
|
|
|
|
|
{
|
|
|
|
|
//
|
|
|
|
|
// pass through both filters just to be sure any internal counters
|
|
|
|
|
// are updated consistently. ramping up is always + and backing off
|
|
|
|
|
// is always - and each should return 0 otherwise...
|
|
|
|
|
//
|
2011-02-28 22:55:02 +01:00
|
|
|
rampup (pressure, capacity) + backoff (pressure, capacity)
|
2011-02-15 15:58:43 -08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait BasicFilter extends Filter with BasicRampup with BasicBackoff
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Filter performs steady incremental growth using only the basic ramp-up subfilter
|
|
|
|
|
*/
|
|
|
|
|
trait BasicNoBackoffFilter extends BasicRampup
|
|
|
|
|
{
|
|
|
|
|
def filter(pressure:Int, capacity:Int):Int = rampup(pressure, capacity)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Basic incremental growth as a percentage of the current pool capacity
|
|
|
|
|
*/
|
|
|
|
|
trait BasicRampup
|
|
|
|
|
{
|
|
|
|
|
def rampupRate:Double
|
|
|
|
|
|
|
|
|
|
def rampup(pressure:Int, capacity:Int):Int =
|
|
|
|
|
{
|
|
|
|
|
if (pressure < capacity) 0
|
|
|
|
|
else math.ceil(rampupRate * capacity) toInt
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Basic decrement as a percentage of the current pool capacity
|
|
|
|
|
*/
|
|
|
|
|
trait BasicBackoff
|
|
|
|
|
{
|
|
|
|
|
def backoffThreshold:Double
|
|
|
|
|
def backoffRate:Double
|
|
|
|
|
|
|
|
|
|
def backoff(pressure:Int, capacity:Int):Int =
|
|
|
|
|
{
|
|
|
|
|
if (capacity > 0 && pressure/capacity < backoffThreshold)
|
|
|
|
|
math.ceil(-1.0 * backoffRate * capacity) toInt
|
|
|
|
|
else
|
|
|
|
|
0
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* This filter tracks the average pressure over the lifetime of the pool (or since last reset) and
|
|
|
|
|
* will begin to reduce capacity once this value drops below the provided threshold. The number of
|
|
|
|
|
* delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied
|
|
|
|
|
* by the difference in capacity and pressure.
|
|
|
|
|
*/
|
|
|
|
|
trait RunningMeanBackoff
|
|
|
|
|
{
|
|
|
|
|
def backoffThreshold:Double
|
|
|
|
|
def backoffRate:Double
|
|
|
|
|
|
|
|
|
|
private var _pressure:Double = 0.0
|
|
|
|
|
private var _capacity:Double = 0.0
|
|
|
|
|
|
|
|
|
|
def backoff(pressure:Int, capacity:Int):Int =
|
|
|
|
|
{
|
|
|
|
|
_pressure += pressure
|
|
|
|
|
_capacity += capacity
|
|
|
|
|
|
|
|
|
|
if (capacity > 0 && pressure/capacity < backoffThreshold &&
|
2011-02-28 22:55:02 +01:00
|
|
|
_capacity > 0 && _pressure/_capacity < backoffThreshold) {
|
2011-02-15 15:58:43 -08:00
|
|
|
math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def backoffReset =
|
|
|
|
|
{
|
|
|
|
|
_pressure - 0.0
|
|
|
|
|
_capacity = 0.0
|
|
|
|
|
}
|
|
|
|
|
}
|