Fixing ticket #703 and reformatting Pool.scala

This commit is contained in:
Viktor Klang 2011-03-14 16:03:47 +01:00
parent 562928184f
commit 2c1ea6969d

View file

@ -5,6 +5,7 @@
package akka.routing
import akka.actor.{Actor, ActorRef, EventHandler}
import java.util.concurrent.TimeUnit
/**
* Actor pooling
@ -25,20 +26,18 @@ import akka.actor.{Actor, ActorRef, EventHandler}
* @author Garrick Evans
*/
object ActorPool
{
case object Stat
case class Stats(size:Int)
object ActorPool {
case object Stat
case class Stats(size:Int)
}
/**
* Defines the nature of an actor pool.
*/
trait ActorPool
{
def instance():ActorRef
def capacity(delegates:Seq[ActorRef]):Int
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int]
trait ActorPool {
def instance(): ActorRef
def capacity(delegates: Seq[ActorRef]): Int
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int]
}
/**
@ -46,77 +45,64 @@ trait ActorPool
* - checks the current capacity and adjusts accordingly if needed
* - routes the incoming message to a selection set of delegate actors
*/
trait DefaultActorPool extends ActorPool
{
this: Actor =>
import ActorPool._
import collection.mutable.LinkedList
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
protected var _delegates = LinkedList[ActorRef]()
private var _lastCapacityChange = 0
private var _lastSelectorCount = 0
override def postStop = _delegates foreach {_ stop}
protected def _route:Receive =
{
//
// for testing...
//
case Stat =>
self reply_? Stats(_delegates length)
case max:MaximumNumberOfRestartsWithinTimeRangeReached =>
_delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)}
trait DefaultActorPool extends ActorPool { this: Actor =>
case msg =>
_capacity
_select foreach {delegate =>
self.senderFuture match {
case None => delegate ! msg
case Some(future) =>
Actor.spawn {
try {
future completeWithResult (delegate !! msg).getOrElse(None)
} catch {
case e =>
EventHandler notifyListeners EventHandler.Error(e, this)
future completeWithException e
}
}
}
}
import ActorPool._
import collection.mutable.LinkedList
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
protected var _delegates = LinkedList[ActorRef]()
private var _lastCapacityChange = 0
private var _lastSelectorCount = 0
override def postStop = _delegates foreach {_ stop}
protected def _route(): Receive = {
// for testing...
case Stat =>
self reply_? Stats(_delegates length)
case max: MaximumNumberOfRestartsWithinTimeRangeReached =>
_delegates = _delegates filterNot { _.uuid == max.victim.uuid }
case msg =>
_capacity()
_select() foreach { delegate =>
self.senderFuture match {
case None =>
delegate ! msg
case Some(future) =>
future completeWith (delegate.!!!(msg, TimeUnit.NANOSECONDS.toMillis(future.timeoutInNanos)))
}
private def _capacity =
{
_lastCapacityChange = capacity(_delegates)
if (_lastCapacityChange > 0) {
_delegates ++= {
for (i <- 0 until _lastCapacityChange) yield {
val delegate = instance()
self startLink delegate
delegate
}
}
}
else if (_lastCapacityChange < 0) {
val s = _delegates splitAt(_delegates.length + _lastCapacityChange)
s._2 foreach {_ stop}
_delegates = s._1
}
}
private def _select =
{
val s = select(_delegates)
_lastSelectorCount = s._2
s._1
}
}
private def _capacity() = {
_lastCapacityChange = capacity(_delegates)
if (_lastCapacityChange > 0) {
_delegates ++= {
for (i <- 0 until _lastCapacityChange) yield {
val delegate = instance()
self startLink delegate
delegate
}
}
}
else if (_lastCapacityChange < 0) {
_delegates splitAt(_delegates.length + _lastCapacityChange) match {
case (keep, abandon) =>
abandon foreach { _.stop }
_delegates = keep
}
}
}
private def _select() = select(_delegates) match {
case (delegates, count) =>
_lastSelectorCount = count
delegates
}
}
@ -128,54 +114,46 @@ trait DefaultActorPool extends ActorPool
/**
* Returns the set of delegates with the least amount of message backlog.
*/
trait SmallestMailboxSelector
{
def selectionCount:Int
def partialFill:Boolean
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
{
var set:Seq[ActorRef] = Nil
var take = {
if (partialFill) math.min(selectionCount, delegates.length)
else selectionCount
}
while (take > 0) {
set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set
take -= set.size
}
trait SmallestMailboxSelector {
def selectionCount: Int
def partialFill: Boolean
(set.iterator, set.size)
}
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
var set: Seq[ActorRef] = Nil
var take = if (partialFill) math.min(selectionCount, delegates.length)
else selectionCount
while (take > 0) {
set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set
take -= set.size
}
(set.iterator, set.size)
}
}
/**
* Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection
*/
trait RoundRobinSelector
{
private var _last:Int = -1;
def selectionCount:Int
def partialFill:Boolean
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
{
val length = delegates.length
val take = {
if (partialFill) math.min(selectionCount, length)
else selectionCount
}
var set = for (i <- 0 to take) yield {
_last += 1
if (_last >= length) _last = 0
delegates(_last)
}
(set.iterator, set.size)
}
trait RoundRobinSelector {
private var _last: Int = -1;
def selectionCount: Int
def partialFill: Boolean
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = {
val length = delegates.length
val take = if (partialFill) math.min(selectionCount, length)
else selectionCount
val set =
for (i <- 0 to take) yield {
_last = (_last + 1) % length
delegates(_last)
}
(set.iterator, set.size)
}
}
@ -187,80 +165,62 @@ trait RoundRobinSelector
/**
* Ensures a fixed number of delegates in the pool
*/
trait FixedSizeCapacitor
{
def limit:Int
def capacity(delegates:Seq[ActorRef]):Int =
{
val d = limit - delegates.size
if (d>0) d
else 0
}
trait FixedSizeCapacitor {
def limit:Int
def capacity(delegates: Seq[ActorRef]): Int = (limit - delegates.size) match {
case i if i > 0 => i
case _ => 0
}
}
/**
* Constrains the pool capacity to a bounded range
*/
trait BoundedCapacitor
{
def lowerBound:Int
def upperBound:Int
def capacity(delegates:Seq[ActorRef]):Int =
{
val current = delegates length
var delta = _eval(delegates)
val proposed = current + delta
if (proposed < lowerBound) delta += (lowerBound - proposed)
else if (proposed > upperBound) delta -= (proposed - upperBound)
delta
}
protected def _eval(delegates:Seq[ActorRef]):Int
trait BoundedCapacitor {
def lowerBound: Int
def upperBound: Int
def capacity(delegates: Seq[ActorRef]): Int = {
val current = delegates length
val delta = _eval(delegates)
val proposed = current + delta
if (proposed < lowerBound) delta + (lowerBound - proposed)
else if (proposed > upperBound) delta - (proposed - upperBound)
else delta
}
protected def _eval(delegates: Seq[ActorRef]): Int
}
/**
* Returns the number of delegates required to manage the current message backlogs
*/
trait MailboxPressureCapacitor
{
def pressureThreshold:Int
def pressure(delegates:Seq[ActorRef]):Int =
{
var n = 0;
delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1}
n
}
trait MailboxPressureCapacitor {
def pressureThreshold:Int
def pressure(delegates: Seq[ActorRef]): Int =
delegates count { _.mailboxSize > pressureThreshold }
}
/**
* Returns the number of delegates required to respond to the number of pending futures
*/
trait ActiveFuturesPressureCapacitor
{
def pressure(delegates:Seq[ActorRef]):Int =
{
var n = 0;
delegates foreach {d => if (d.senderFuture.isDefined) n+=1}
n
}
trait ActiveFuturesPressureCapacitor {
def pressure(delegates: Seq[ActorRef]): Int =
delegates count { _.senderFuture.isDefined }
}
/**
*/
trait CapacityStrategy
{
import ActorPool._
def pressure(delegates:Seq[ActorRef]):Int
def filter(pressure:Int, capacity:Int):Int
protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size)
trait CapacityStrategy {
import ActorPool._
def pressure(delegates: Seq[ActorRef]): Int
def filter(pressure: Int, capacity: Int): Int
protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
}
trait FixedCapacityStrategy extends FixedSizeCapacitor
@ -274,20 +234,15 @@ trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
/**
* The basic filter trait that composes ramp-up and and back-off subfiltering.
*/
trait Filter
{
def rampup(pressure:Int, capacity:Int):Int
def backoff(pressure:Int, capacity:Int):Int
trait Filter {
def rampup(pressure: Int, capacity: Int): Int
def backoff(pressure: Int, capacity: Int): Int
def filter(pressure:Int, capacity:Int):Int =
{
//
// pass through both filters just to be sure any internal counters
// are updated consistently. ramping up is always + and backing off
// is always - and each should return 0 otherwise...
//
rampup (pressure, capacity) + backoff (pressure, capacity)
}
// pass through both filters just to be sure any internal counters
// are updated consistently. ramping up is always + and backing off
// is always - and each should return 0 otherwise...
def filter(pressure: Int, capacity: Int): Int =
rampup (pressure, capacity) + backoff (pressure, capacity)
}
trait BasicFilter extends Filter with BasicRampup with BasicBackoff
@ -295,40 +250,29 @@ trait BasicFilter extends Filter with BasicRampup with BasicBackoff
/**
* Filter performs steady incremental growth using only the basic ramp-up subfilter
*/
trait BasicNoBackoffFilter extends BasicRampup
{
def filter(pressure:Int, capacity:Int):Int = rampup(pressure, capacity)
trait BasicNoBackoffFilter extends BasicRampup {
def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity)
}
/**
* Basic incremental growth as a percentage of the current pool capacity
*/
trait BasicRampup
{
def rampupRate:Double
trait BasicRampup {
def rampupRate: Double
def rampup(pressure:Int, capacity:Int):Int =
{
if (pressure < capacity) 0
else math.ceil(rampupRate * capacity) toInt
}
def rampup(pressure: Int, capacity: Int): Int =
if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt
}
/**
* Basic decrement as a percentage of the current pool capacity
*/
trait BasicBackoff
{
def backoffThreshold:Double
def backoffRate:Double
trait BasicBackoff {
def backoffThreshold: Double
def backoffRate: Double
def backoff(pressure:Int, capacity:Int):Int =
{
if (capacity > 0 && pressure/capacity < backoffThreshold)
math.ceil(-1.0 * backoffRate * capacity) toInt
else
0
}
def backoff(pressure: Int, capacity: Int): Int =
if (capacity > 0 && pressure / capacity < backoffThreshold) math.ceil(-1.0 * backoffRate * capacity) toInt else 0
}
/**
* This filter tracks the average pressure over the lifetime of the pool (or since last reset) and
@ -336,29 +280,23 @@ trait BasicBackoff
* delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied
* by the difference in capacity and pressure.
*/
trait RunningMeanBackoff
{
def backoffThreshold:Double
def backoffRate:Double
trait RunningMeanBackoff {
def backoffThreshold: Double
def backoffRate: Double
private var _pressure:Double = 0.0
private var _capacity:Double = 0.0
private var _pressure: Double = 0.0
private var _capacity: Double = 0.0
def backoff(pressure:Int, capacity:Int):Int =
{
def backoff(pressure: Int, capacity: Int): Int = {
_pressure += pressure
_capacity += capacity
if (capacity > 0 && pressure/capacity < backoffThreshold &&
_capacity > 0 && _pressure/_capacity < backoffThreshold) {
if (capacity > 0 && pressure / capacity < backoffThreshold && _capacity > 0 && _pressure / _capacity < backoffThreshold)
math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt
}
else
0
else 0
}
def backoffReset =
{
def backoffReset = {
_pressure - 0.0
_capacity = 0.0
}