diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index d8437590d1..30a70fb2bc 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -5,6 +5,7 @@ package akka.routing import akka.actor.{Actor, ActorRef, EventHandler} +import java.util.concurrent.TimeUnit /** * Actor pooling @@ -25,20 +26,18 @@ import akka.actor.{Actor, ActorRef, EventHandler} * @author Garrick Evans */ -object ActorPool -{ - case object Stat - case class Stats(size:Int) +object ActorPool { + case object Stat + case class Stats(size:Int) } /** * Defines the nature of an actor pool. */ -trait ActorPool -{ - def instance():ActorRef - def capacity(delegates:Seq[ActorRef]):Int - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] +trait ActorPool { + def instance(): ActorRef + def capacity(delegates: Seq[ActorRef]): Int + def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] } /** @@ -46,77 +45,64 @@ trait ActorPool * - checks the current capacity and adjusts accordingly if needed * - routes the incoming message to a selection set of delegate actors */ -trait DefaultActorPool extends ActorPool -{ - this: Actor => - - import ActorPool._ - import collection.mutable.LinkedList - import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached - - - protected var _delegates = LinkedList[ActorRef]() - private var _lastCapacityChange = 0 - private var _lastSelectorCount = 0 - - - override def postStop = _delegates foreach {_ stop} - - protected def _route:Receive = - { - // - // for testing... - // - case Stat => - self reply_? Stats(_delegates length) - - case max:MaximumNumberOfRestartsWithinTimeRangeReached => - _delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)} +trait DefaultActorPool extends ActorPool { this: Actor => - case msg => - _capacity - _select foreach {delegate => - self.senderFuture match { - case None => delegate ! msg - case Some(future) => - Actor.spawn { - try { - future completeWithResult (delegate !! msg).getOrElse(None) - } catch { - case e => - EventHandler notifyListeners EventHandler.Error(e, this) - future completeWithException e - } - } - } - } + import ActorPool._ + import collection.mutable.LinkedList + import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached + + + protected var _delegates = LinkedList[ActorRef]() + private var _lastCapacityChange = 0 + private var _lastSelectorCount = 0 + + + override def postStop = _delegates foreach {_ stop} + + protected def _route(): Receive = { + // for testing... + case Stat => + self reply_? Stats(_delegates length) + case max: MaximumNumberOfRestartsWithinTimeRangeReached => + _delegates = _delegates filterNot { _.uuid == max.victim.uuid } + + case msg => + _capacity() + _select() foreach { delegate => + self.senderFuture match { + case None => + delegate ! msg + case Some(future) => + future completeWith (delegate.!!!(msg, TimeUnit.NANOSECONDS.toMillis(future.timeoutInNanos))) } - - private def _capacity = - { - _lastCapacityChange = capacity(_delegates) - if (_lastCapacityChange > 0) { - _delegates ++= { - for (i <- 0 until _lastCapacityChange) yield { - val delegate = instance() - self startLink delegate - delegate - } - } - } - else if (_lastCapacityChange < 0) { - val s = _delegates splitAt(_delegates.length + _lastCapacityChange) - s._2 foreach {_ stop} - _delegates = s._1 - } - } - - private def _select = - { - val s = select(_delegates) - _lastSelectorCount = s._2 - s._1 + } + } + + private def _capacity() = { + _lastCapacityChange = capacity(_delegates) + if (_lastCapacityChange > 0) { + _delegates ++= { + for (i <- 0 until _lastCapacityChange) yield { + val delegate = instance() + self startLink delegate + delegate } + } + } + else if (_lastCapacityChange < 0) { + _delegates splitAt(_delegates.length + _lastCapacityChange) match { + case (keep, abandon) => + abandon foreach { _.stop } + _delegates = keep + } + } + } + + private def _select() = select(_delegates) match { + case (delegates, count) => + _lastSelectorCount = count + delegates + } } @@ -128,54 +114,46 @@ trait DefaultActorPool extends ActorPool /** * Returns the set of delegates with the least amount of message backlog. */ -trait SmallestMailboxSelector -{ - def selectionCount:Int - def partialFill:Boolean - - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = - { - var set:Seq[ActorRef] = Nil - var take = { - if (partialFill) math.min(selectionCount, delegates.length) - else selectionCount - } - - while (take > 0) { - set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set - take -= set.size - } +trait SmallestMailboxSelector { + def selectionCount: Int + def partialFill: Boolean - (set.iterator, set.size) - } + def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = { + var set: Seq[ActorRef] = Nil + var take = if (partialFill) math.min(selectionCount, delegates.length) + else selectionCount + + while (take > 0) { + set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set + take -= set.size + } + + (set.iterator, set.size) + } } /** * Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection */ -trait RoundRobinSelector -{ - private var _last:Int = -1; - - def selectionCount:Int - def partialFill:Boolean - - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = - { - val length = delegates.length - val take = { - if (partialFill) math.min(selectionCount, length) - else selectionCount - } - - var set = for (i <- 0 to take) yield { - _last += 1 - if (_last >= length) _last = 0 - delegates(_last) - } - - (set.iterator, set.size) - } +trait RoundRobinSelector { + private var _last: Int = -1; + + def selectionCount: Int + def partialFill: Boolean + + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = { + val length = delegates.length + val take = if (partialFill) math.min(selectionCount, length) + else selectionCount + + val set = + for (i <- 0 to take) yield { + _last = (_last + 1) % length + delegates(_last) + } + + (set.iterator, set.size) + } } @@ -187,80 +165,62 @@ trait RoundRobinSelector /** * Ensures a fixed number of delegates in the pool */ -trait FixedSizeCapacitor -{ - def limit:Int - - def capacity(delegates:Seq[ActorRef]):Int = - { - val d = limit - delegates.size - if (d>0) d - else 0 - } +trait FixedSizeCapacitor { + def limit:Int + + def capacity(delegates: Seq[ActorRef]): Int = (limit - delegates.size) match { + case i if i > 0 => i + case _ => 0 + } } /** * Constrains the pool capacity to a bounded range */ -trait BoundedCapacitor -{ - def lowerBound:Int - def upperBound:Int - - def capacity(delegates:Seq[ActorRef]):Int = - { - val current = delegates length - var delta = _eval(delegates) - val proposed = current + delta - - if (proposed < lowerBound) delta += (lowerBound - proposed) - else if (proposed > upperBound) delta -= (proposed - upperBound) - - delta - } - - protected def _eval(delegates:Seq[ActorRef]):Int +trait BoundedCapacitor { + def lowerBound: Int + def upperBound: Int + + def capacity(delegates: Seq[ActorRef]): Int = { + val current = delegates length + val delta = _eval(delegates) + val proposed = current + delta + + if (proposed < lowerBound) delta + (lowerBound - proposed) + else if (proposed > upperBound) delta - (proposed - upperBound) + else delta + } + + protected def _eval(delegates: Seq[ActorRef]): Int } /** * Returns the number of delegates required to manage the current message backlogs */ -trait MailboxPressureCapacitor -{ - def pressureThreshold:Int - - def pressure(delegates:Seq[ActorRef]):Int = - { - var n = 0; - delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1} - n - } +trait MailboxPressureCapacitor { + def pressureThreshold:Int + def pressure(delegates: Seq[ActorRef]): Int = + delegates count { _.mailboxSize > pressureThreshold } } /** * Returns the number of delegates required to respond to the number of pending futures */ -trait ActiveFuturesPressureCapacitor -{ - def pressure(delegates:Seq[ActorRef]):Int = - { - var n = 0; - delegates foreach {d => if (d.senderFuture.isDefined) n+=1} - n - } +trait ActiveFuturesPressureCapacitor { + def pressure(delegates: Seq[ActorRef]): Int = + delegates count { _.senderFuture.isDefined } } /** */ -trait CapacityStrategy -{ - import ActorPool._ - - def pressure(delegates:Seq[ActorRef]):Int - def filter(pressure:Int, capacity:Int):Int - - protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size) +trait CapacityStrategy { + import ActorPool._ + + def pressure(delegates: Seq[ActorRef]): Int + def filter(pressure: Int, capacity: Int): Int + + protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size) } trait FixedCapacityStrategy extends FixedSizeCapacitor @@ -274,20 +234,15 @@ trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor /** * The basic filter trait that composes ramp-up and and back-off subfiltering. */ -trait Filter -{ - def rampup(pressure:Int, capacity:Int):Int - def backoff(pressure:Int, capacity:Int):Int +trait Filter { + def rampup(pressure: Int, capacity: Int): Int + def backoff(pressure: Int, capacity: Int): Int - def filter(pressure:Int, capacity:Int):Int = - { - // - // pass through both filters just to be sure any internal counters - // are updated consistently. ramping up is always + and backing off - // is always - and each should return 0 otherwise... - // - rampup (pressure, capacity) + backoff (pressure, capacity) - } + // pass through both filters just to be sure any internal counters + // are updated consistently. ramping up is always + and backing off + // is always - and each should return 0 otherwise... + def filter(pressure: Int, capacity: Int): Int = + rampup (pressure, capacity) + backoff (pressure, capacity) } trait BasicFilter extends Filter with BasicRampup with BasicBackoff @@ -295,40 +250,29 @@ trait BasicFilter extends Filter with BasicRampup with BasicBackoff /** * Filter performs steady incremental growth using only the basic ramp-up subfilter */ -trait BasicNoBackoffFilter extends BasicRampup -{ - def filter(pressure:Int, capacity:Int):Int = rampup(pressure, capacity) +trait BasicNoBackoffFilter extends BasicRampup { + def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity) } /** * Basic incremental growth as a percentage of the current pool capacity */ -trait BasicRampup -{ - def rampupRate:Double +trait BasicRampup { + def rampupRate: Double - def rampup(pressure:Int, capacity:Int):Int = - { - if (pressure < capacity) 0 - else math.ceil(rampupRate * capacity) toInt - } + def rampup(pressure: Int, capacity: Int): Int = + if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt } /** * Basic decrement as a percentage of the current pool capacity */ -trait BasicBackoff -{ - def backoffThreshold:Double - def backoffRate:Double +trait BasicBackoff { + def backoffThreshold: Double + def backoffRate: Double - def backoff(pressure:Int, capacity:Int):Int = - { - if (capacity > 0 && pressure/capacity < backoffThreshold) - math.ceil(-1.0 * backoffRate * capacity) toInt - else - 0 - } + def backoff(pressure: Int, capacity: Int): Int = + if (capacity > 0 && pressure / capacity < backoffThreshold) math.ceil(-1.0 * backoffRate * capacity) toInt else 0 } /** * This filter tracks the average pressure over the lifetime of the pool (or since last reset) and @@ -336,29 +280,23 @@ trait BasicBackoff * delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied * by the difference in capacity and pressure. */ -trait RunningMeanBackoff -{ - def backoffThreshold:Double - def backoffRate:Double +trait RunningMeanBackoff { + def backoffThreshold: Double + def backoffRate: Double - private var _pressure:Double = 0.0 - private var _capacity:Double = 0.0 + private var _pressure: Double = 0.0 + private var _capacity: Double = 0.0 - def backoff(pressure:Int, capacity:Int):Int = - { + def backoff(pressure: Int, capacity: Int): Int = { _pressure += pressure _capacity += capacity - if (capacity > 0 && pressure/capacity < backoffThreshold && - _capacity > 0 && _pressure/_capacity < backoffThreshold) { + if (capacity > 0 && pressure / capacity < backoffThreshold && _capacity > 0 && _pressure / _capacity < backoffThreshold) math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt - } - else - 0 + else 0 } - def backoffReset = - { + def backoffReset = { _pressure - 0.0 _capacity = 0.0 }