tabs to spaces
This commit is contained in:
parent
1425267a65
commit
6e1b7958fe
3 changed files with 447 additions and 447 deletions
0
akka-actor/src/main/scala/akka/actor/FSM.scala
Executable file → Normal file
0
akka-actor/src/main/scala/akka/actor/FSM.scala
Executable file → Normal file
|
|
@ -14,8 +14,8 @@ import akka.actor. {Actor, ActorRef}
|
||||||
*
|
*
|
||||||
* Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message.
|
* Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message.
|
||||||
* Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types.
|
* Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types.
|
||||||
* The first determines the size itself - either fixed or bounded.
|
* The first determines the size itself - either fixed or bounded.
|
||||||
* The second determines how to adjust of the pool according to some internal pressure characteristic.
|
* The second determines how to adjust of the pool according to some internal pressure characteristic.
|
||||||
* Filters - A filter can be used to refine the raw pressure value returned from a capacitor.
|
* Filters - A filter can be used to refine the raw pressure value returned from a capacitor.
|
||||||
*
|
*
|
||||||
* It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say
|
* It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say
|
||||||
|
|
@ -27,8 +27,8 @@ import akka.actor. {Actor, ActorRef}
|
||||||
|
|
||||||
object ActorPool
|
object ActorPool
|
||||||
{
|
{
|
||||||
case object Stat
|
case object Stat
|
||||||
case class Stats(size:Int)
|
case class Stats(size:Int)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -36,91 +36,91 @@ object ActorPool
|
||||||
*/
|
*/
|
||||||
trait ActorPool
|
trait ActorPool
|
||||||
{
|
{
|
||||||
def instance():ActorRef
|
def instance():ActorRef
|
||||||
def capacity(delegates:Seq[ActorRef]):Int
|
def capacity(delegates:Seq[ActorRef]):Int
|
||||||
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int]
|
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A default implementation of a pool, on each message to route,
|
* A default implementation of a pool, on each message to route,
|
||||||
* - checks the current capacity and adjusts accordingly if needed
|
* - checks the current capacity and adjusts accordingly if needed
|
||||||
* - routes the incoming message to a selection set of delegate actors
|
* - routes the incoming message to a selection set of delegate actors
|
||||||
*/
|
*/
|
||||||
trait DefaultActorPool extends ActorPool
|
trait DefaultActorPool extends ActorPool
|
||||||
{
|
{
|
||||||
this: Actor =>
|
this: Actor =>
|
||||||
|
|
||||||
import ActorPool._
|
import ActorPool._
|
||||||
import collection.mutable.LinkedList
|
import collection.mutable.LinkedList
|
||||||
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
|
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
|
||||||
|
|
||||||
|
|
||||||
protected var _delegates = LinkedList[ActorRef]()
|
protected var _delegates = LinkedList[ActorRef]()
|
||||||
private var _lastCapacityChange = 0
|
private var _lastCapacityChange = 0
|
||||||
private var _lastSelectorCount = 0
|
private var _lastSelectorCount = 0
|
||||||
|
|
||||||
|
|
||||||
override def postStop = _delegates foreach {_ stop}
|
override def postStop = _delegates foreach {_ stop}
|
||||||
|
|
||||||
protected def _route:Receive =
|
protected def _route:Receive =
|
||||||
{
|
{
|
||||||
//
|
//
|
||||||
// for testing...
|
// for testing...
|
||||||
//
|
//
|
||||||
case Stat =>
|
case Stat =>
|
||||||
self reply_? Stats(_delegates length)
|
self reply_? Stats(_delegates length)
|
||||||
|
|
||||||
case max:MaximumNumberOfRestartsWithinTimeRangeReached =>
|
case max:MaximumNumberOfRestartsWithinTimeRangeReached =>
|
||||||
_delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)}
|
_delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)}
|
||||||
|
|
||||||
case msg =>
|
case msg =>
|
||||||
_capacity
|
_capacity
|
||||||
_select foreach {delegate =>
|
_select foreach {delegate =>
|
||||||
self.senderFuture match {
|
self.senderFuture match {
|
||||||
case None => delegate ! msg
|
case None => delegate ! msg
|
||||||
case Some(future) =>
|
case Some(future) =>
|
||||||
Actor.spawn {
|
Actor.spawn {
|
||||||
try {
|
try {
|
||||||
future completeWithResult (delegate !! msg).getOrElse(None)
|
future completeWithResult (delegate !! msg).getOrElse(None)
|
||||||
} catch {
|
} catch {
|
||||||
case ex => future completeWithException ex
|
case ex => future completeWithException ex
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def _capacity =
|
private def _capacity =
|
||||||
{
|
{
|
||||||
_lastCapacityChange = capacity(_delegates)
|
_lastCapacityChange = capacity(_delegates)
|
||||||
if (_lastCapacityChange > 0) {
|
if (_lastCapacityChange > 0) {
|
||||||
_delegates ++= {
|
_delegates ++= {
|
||||||
for (i <- 0 until _lastCapacityChange) yield {
|
for (i <- 0 until _lastCapacityChange) yield {
|
||||||
val delegate = instance()
|
val delegate = instance()
|
||||||
self startLink delegate
|
self startLink delegate
|
||||||
delegate
|
delegate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (_lastCapacityChange < 0) {
|
else if (_lastCapacityChange < 0) {
|
||||||
val s = _delegates splitAt(_delegates.length + _lastCapacityChange)
|
val s = _delegates splitAt(_delegates.length + _lastCapacityChange)
|
||||||
s._2 foreach {_ stop}
|
s._2 foreach {_ stop}
|
||||||
_delegates = s._1
|
_delegates = s._1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def _select =
|
private def _select =
|
||||||
{
|
{
|
||||||
val s = select(_delegates)
|
val s = select(_delegates)
|
||||||
_lastSelectorCount = s._2
|
_lastSelectorCount = s._2
|
||||||
s._1
|
s._1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Selectors
|
* Selectors
|
||||||
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
|
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
|
||||||
**/
|
**/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -128,24 +128,24 @@ trait DefaultActorPool extends ActorPool
|
||||||
*/
|
*/
|
||||||
trait SmallestMailboxSelector
|
trait SmallestMailboxSelector
|
||||||
{
|
{
|
||||||
def selectionCount:Int
|
def selectionCount:Int
|
||||||
def partialFill:Boolean
|
def partialFill:Boolean
|
||||||
|
|
||||||
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
|
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
|
||||||
{
|
{
|
||||||
var set:Seq[ActorRef] = Nil
|
var set:Seq[ActorRef] = Nil
|
||||||
var take = {
|
var take = {
|
||||||
if (partialFill) math.min(selectionCount, delegates.length)
|
if (partialFill) math.min(selectionCount, delegates.length)
|
||||||
else selectionCount
|
else selectionCount
|
||||||
}
|
}
|
||||||
|
|
||||||
while (take > 0) {
|
while (take > 0) {
|
||||||
set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set
|
set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set
|
||||||
take -= set.size
|
take -= set.size
|
||||||
}
|
}
|
||||||
|
|
||||||
(set.iterator, set.size)
|
(set.iterator, set.size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -153,33 +153,33 @@ trait SmallestMailboxSelector
|
||||||
*/
|
*/
|
||||||
trait RoundRobinSelector
|
trait RoundRobinSelector
|
||||||
{
|
{
|
||||||
private var _last:Int = -1;
|
private var _last:Int = -1;
|
||||||
|
|
||||||
def selectionCount:Int
|
def selectionCount:Int
|
||||||
def partialFill:Boolean
|
def partialFill:Boolean
|
||||||
|
|
||||||
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
|
def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] =
|
||||||
{
|
{
|
||||||
val length = delegates.length
|
val length = delegates.length
|
||||||
val take = {
|
val take = {
|
||||||
if (partialFill) math.min(selectionCount, length)
|
if (partialFill) math.min(selectionCount, length)
|
||||||
else selectionCount
|
else selectionCount
|
||||||
}
|
}
|
||||||
|
|
||||||
var set = for (i <- 0 to take) yield {
|
var set = for (i <- 0 to take) yield {
|
||||||
_last += 1
|
_last += 1
|
||||||
if (_last >= length) _last = 0
|
if (_last >= length) _last = 0
|
||||||
delegates(_last)
|
delegates(_last)
|
||||||
}
|
}
|
||||||
|
|
||||||
(set.iterator, set.size)
|
(set.iterator, set.size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Capacitors
|
* Capacitors
|
||||||
* These traits define how to alter the size of the pool
|
* These traits define how to alter the size of the pool
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -187,14 +187,14 @@ trait RoundRobinSelector
|
||||||
*/
|
*/
|
||||||
trait FixedSizeCapacitor
|
trait FixedSizeCapacitor
|
||||||
{
|
{
|
||||||
def limit:Int
|
def limit:Int
|
||||||
|
|
||||||
def capacity(delegates:Seq[ActorRef]):Int =
|
def capacity(delegates:Seq[ActorRef]):Int =
|
||||||
{
|
{
|
||||||
val d = limit - delegates.size
|
val d = limit - delegates.size
|
||||||
if (d>0) d
|
if (d>0) d
|
||||||
else 0
|
else 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -202,22 +202,22 @@ trait FixedSizeCapacitor
|
||||||
*/
|
*/
|
||||||
trait BoundedCapacitor
|
trait BoundedCapacitor
|
||||||
{
|
{
|
||||||
def lowerBound:Int
|
def lowerBound:Int
|
||||||
def upperBound:Int
|
def upperBound:Int
|
||||||
|
|
||||||
def capacity(delegates:Seq[ActorRef]):Int =
|
def capacity(delegates:Seq[ActorRef]):Int =
|
||||||
{
|
{
|
||||||
val current = delegates length
|
val current = delegates length
|
||||||
var delta = _eval(delegates)
|
var delta = _eval(delegates)
|
||||||
val proposed = current + delta
|
val proposed = current + delta
|
||||||
|
|
||||||
if (proposed < lowerBound) delta += (lowerBound - proposed)
|
if (proposed < lowerBound) delta += (lowerBound - proposed)
|
||||||
else if (proposed > upperBound) delta -= (proposed - upperBound)
|
else if (proposed > upperBound) delta -= (proposed - upperBound)
|
||||||
|
|
||||||
delta
|
delta
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def _eval(delegates:Seq[ActorRef]):Int
|
protected def _eval(delegates:Seq[ActorRef]):Int
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -225,14 +225,14 @@ trait BoundedCapacitor
|
||||||
*/
|
*/
|
||||||
trait MailboxPressureCapacitor
|
trait MailboxPressureCapacitor
|
||||||
{
|
{
|
||||||
def pressureThreshold:Int
|
def pressureThreshold:Int
|
||||||
|
|
||||||
def pressure(delegates:Seq[ActorRef]):Int =
|
def pressure(delegates:Seq[ActorRef]):Int =
|
||||||
{
|
{
|
||||||
var n = 0;
|
var n = 0;
|
||||||
delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1}
|
delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1}
|
||||||
n
|
n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -240,12 +240,12 @@ trait MailboxPressureCapacitor
|
||||||
*/
|
*/
|
||||||
trait ActiveFuturesPressureCapacitor
|
trait ActiveFuturesPressureCapacitor
|
||||||
{
|
{
|
||||||
def pressure(delegates:Seq[ActorRef]):Int =
|
def pressure(delegates:Seq[ActorRef]):Int =
|
||||||
{
|
{
|
||||||
var n = 0;
|
var n = 0;
|
||||||
delegates foreach {d => if (d.senderFuture.isDefined) n+=1}
|
delegates foreach {d => if (d.senderFuture.isDefined) n+=1}
|
||||||
n
|
n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -253,12 +253,12 @@ trait ActiveFuturesPressureCapacitor
|
||||||
*/
|
*/
|
||||||
trait CapacityStrategy
|
trait CapacityStrategy
|
||||||
{
|
{
|
||||||
import ActorPool._
|
import ActorPool._
|
||||||
|
|
||||||
def pressure(delegates:Seq[ActorRef]):Int
|
def pressure(delegates:Seq[ActorRef]):Int
|
||||||
def filter(pressure:Int, capacity:Int):Int
|
def filter(pressure:Int, capacity:Int):Int
|
||||||
|
|
||||||
protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size)
|
protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
trait FixedCapacityStrategy extends FixedSizeCapacitor
|
trait FixedCapacityStrategy extends FixedSizeCapacitor
|
||||||
|
|
@ -284,7 +284,7 @@ trait Filter
|
||||||
// are updated consistently. ramping up is always + and backing off
|
// are updated consistently. ramping up is always + and backing off
|
||||||
// is always - and each should return 0 otherwise...
|
// is always - and each should return 0 otherwise...
|
||||||
//
|
//
|
||||||
rampup (pressure, capacity) + backoff (pressure, capacity)
|
rampup (pressure, capacity) + backoff (pressure, capacity)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -348,7 +348,7 @@ trait RunningMeanBackoff
|
||||||
_capacity += capacity
|
_capacity += capacity
|
||||||
|
|
||||||
if (capacity > 0 && pressure/capacity < backoffThreshold &&
|
if (capacity > 0 && pressure/capacity < backoffThreshold &&
|
||||||
_capacity > 0 && _pressure/_capacity < backoffThreshold) {
|
_capacity > 0 && _pressure/_capacity < backoffThreshold) {
|
||||||
math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt
|
math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
||||||
|
|
@ -180,315 +180,315 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
||||||
for(a <- List(t1,t2,d1,d2)) a.stop
|
for(a <- List(t1,t2,d1,d2)) a.stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Actor Pool Capacity Tests
|
// Actor Pool Capacity Tests
|
||||||
|
|
||||||
//
|
//
|
||||||
// make sure the pool is of the fixed, expected capacity
|
// make sure the pool is of the fixed, expected capacity
|
||||||
//
|
//
|
||||||
@Test def testFixedCapacityActorPool = {
|
@Test def testFixedCapacityActorPool = {
|
||||||
|
|
||||||
val latch = new CountDownLatch(2)
|
val latch = new CountDownLatch(2)
|
||||||
val counter = new AtomicInteger(0)
|
val counter = new AtomicInteger(0)
|
||||||
class TestPool extends Actor with DefaultActorPool
|
class TestPool extends Actor with DefaultActorPool
|
||||||
with FixedCapacityStrategy
|
with FixedCapacityStrategy
|
||||||
with SmallestMailboxSelector
|
with SmallestMailboxSelector
|
||||||
{
|
{
|
||||||
def factory = actorOf(new Actor {
|
def factory = actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case _ =>
|
case _ =>
|
||||||
counter.incrementAndGet
|
counter.incrementAndGet
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
def limit = 2
|
def limit = 2
|
||||||
def selectionCount = 1
|
def selectionCount = 1
|
||||||
def partialFill = true
|
def partialFill = true
|
||||||
def instance = factory
|
def instance = factory
|
||||||
def receive = _route
|
def receive = _route
|
||||||
}
|
}
|
||||||
|
|
||||||
val pool = actorOf(new TestPool).start
|
val pool = actorOf(new TestPool).start
|
||||||
pool ! "a"
|
pool ! "a"
|
||||||
pool ! "b"
|
pool ! "b"
|
||||||
val done = latch.await(1,TimeUnit.SECONDS)
|
val done = latch.await(1,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
counter.get must be (2)
|
counter.get must be (2)
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
||||||
|
|
||||||
pool stop
|
pool stop
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
||||||
// as influenced by the backlog of blocking pooled actors
|
// as influenced by the backlog of blocking pooled actors
|
||||||
//
|
//
|
||||||
@Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = {
|
@Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = {
|
||||||
|
|
||||||
var latch = new CountDownLatch(3)
|
var latch = new CountDownLatch(3)
|
||||||
val counter = new AtomicInteger(0)
|
val counter = new AtomicInteger(0)
|
||||||
class TestPool extends Actor with DefaultActorPool
|
class TestPool extends Actor with DefaultActorPool
|
||||||
with BoundedCapacityStrategy
|
with BoundedCapacityStrategy
|
||||||
with ActiveFuturesPressureCapacitor
|
with ActiveFuturesPressureCapacitor
|
||||||
with SmallestMailboxSelector
|
with SmallestMailboxSelector
|
||||||
with BasicNoBackoffFilter
|
with BasicNoBackoffFilter
|
||||||
{
|
{
|
||||||
def factory = actorOf(new Actor {
|
def factory = actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case n:Int =>
|
case n:Int =>
|
||||||
Thread.sleep(n)
|
Thread.sleep(n)
|
||||||
counter.incrementAndGet
|
counter.incrementAndGet
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
def lowerBound = 2
|
def lowerBound = 2
|
||||||
def upperBound = 4
|
def upperBound = 4
|
||||||
def rampupRate = 0.1
|
def rampupRate = 0.1
|
||||||
def partialFill = true
|
def partialFill = true
|
||||||
def selectionCount = 1
|
def selectionCount = 1
|
||||||
def instance = factory
|
def instance = factory
|
||||||
def receive = _route
|
def receive = _route
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// first message should create the minimum number of delgates
|
// first message should create the minimum number of delgates
|
||||||
//
|
//
|
||||||
val pool = actorOf(new TestPool).start
|
val pool = actorOf(new TestPool).start
|
||||||
pool ! 1
|
pool ! 1
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
||||||
|
|
||||||
var loops = 0
|
var loops = 0
|
||||||
def loop(t:Int) = {
|
def loop(t:Int) = {
|
||||||
latch = new CountDownLatch(loops)
|
latch = new CountDownLatch(loops)
|
||||||
counter.set(0)
|
counter.set(0)
|
||||||
for (m <- 0 until loops) {
|
for (m <- 0 until loops) {
|
||||||
pool !!! t
|
pool !!! t
|
||||||
Thread.sleep(50)
|
Thread.sleep(50)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// 2 more should go thru w/out triggering more
|
// 2 more should go thru w/out triggering more
|
||||||
//
|
//
|
||||||
loops = 2
|
loops = 2
|
||||||
loop(500)
|
loop(500)
|
||||||
var done = latch.await(5,TimeUnit.SECONDS)
|
var done = latch.await(5,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
counter.get must be (loops)
|
counter.get must be (loops)
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
||||||
|
|
||||||
//
|
//
|
||||||
// a whole bunch should max it out
|
// a whole bunch should max it out
|
||||||
//
|
//
|
||||||
loops = 10
|
loops = 10
|
||||||
loop(500)
|
loop(500)
|
||||||
|
|
||||||
done = latch.await(5,TimeUnit.SECONDS)
|
done = latch.await(5,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
counter.get must be (loops)
|
counter.get must be (loops)
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4)
|
||||||
|
|
||||||
pool stop
|
pool stop
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
||||||
// as influenced by the backlog of messages in the delegate mailboxes
|
// as influenced by the backlog of messages in the delegate mailboxes
|
||||||
//
|
//
|
||||||
@Test def testBoundedCapacityActorPoolWithMailboxPressure = {
|
@Test def testBoundedCapacityActorPoolWithMailboxPressure = {
|
||||||
|
|
||||||
var latch = new CountDownLatch(3)
|
var latch = new CountDownLatch(3)
|
||||||
val counter = new AtomicInteger(0)
|
val counter = new AtomicInteger(0)
|
||||||
class TestPool extends Actor with DefaultActorPool
|
class TestPool extends Actor with DefaultActorPool
|
||||||
with BoundedCapacityStrategy
|
with BoundedCapacityStrategy
|
||||||
with MailboxPressureCapacitor
|
with MailboxPressureCapacitor
|
||||||
with SmallestMailboxSelector
|
with SmallestMailboxSelector
|
||||||
with BasicNoBackoffFilter
|
with BasicNoBackoffFilter
|
||||||
{
|
{
|
||||||
def factory = actorOf(new Actor {
|
def factory = actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case n:Int =>
|
case n:Int =>
|
||||||
Thread.sleep(n)
|
Thread.sleep(n)
|
||||||
counter.incrementAndGet
|
counter.incrementAndGet
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
def lowerBound = 2
|
def lowerBound = 2
|
||||||
def upperBound = 4
|
def upperBound = 4
|
||||||
def pressureThreshold = 3
|
def pressureThreshold = 3
|
||||||
def rampupRate = 0.1
|
def rampupRate = 0.1
|
||||||
def partialFill = true
|
def partialFill = true
|
||||||
def selectionCount = 1
|
def selectionCount = 1
|
||||||
def instance = factory
|
def instance = factory
|
||||||
def receive = _route
|
def receive = _route
|
||||||
}
|
}
|
||||||
|
|
||||||
val pool = actorOf(new TestPool).start
|
val pool = actorOf(new TestPool).start
|
||||||
|
|
||||||
var loops = 0
|
var loops = 0
|
||||||
def loop(t:Int) = {
|
def loop(t:Int) = {
|
||||||
latch = new CountDownLatch(loops)
|
latch = new CountDownLatch(loops)
|
||||||
counter.set(0)
|
counter.set(0)
|
||||||
for (m <- 0 until loops) {
|
for (m <- 0 until loops) {
|
||||||
pool ! t
|
pool ! t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// send a few messages and observe pool at its lower bound
|
// send a few messages and observe pool at its lower bound
|
||||||
//
|
//
|
||||||
loops = 3
|
loops = 3
|
||||||
loop(500)
|
loop(500)
|
||||||
var done = latch.await(5,TimeUnit.SECONDS)
|
var done = latch.await(5,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
counter.get must be (loops)
|
counter.get must be (loops)
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
|
||||||
|
|
||||||
//
|
//
|
||||||
// send a bunch over the theshold and observe an increment
|
// send a bunch over the theshold and observe an increment
|
||||||
//
|
//
|
||||||
loops = 15
|
loops = 15
|
||||||
loop(500)
|
loop(500)
|
||||||
|
|
||||||
done = latch.await(10,TimeUnit.SECONDS)
|
done = latch.await(10,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
counter.get must be (loops)
|
counter.get must be (loops)
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3)
|
||||||
|
|
||||||
pool stop
|
pool stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Actor Pool Selector Tests
|
// Actor Pool Selector Tests
|
||||||
|
|
||||||
@Test def testRoundRobinSelector = {
|
@Test def testRoundRobinSelector = {
|
||||||
|
|
||||||
var latch = new CountDownLatch(2)
|
var latch = new CountDownLatch(2)
|
||||||
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
|
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
|
||||||
|
|
||||||
class TestPool1 extends Actor with DefaultActorPool
|
class TestPool1 extends Actor with DefaultActorPool
|
||||||
with FixedCapacityStrategy
|
with FixedCapacityStrategy
|
||||||
with RoundRobinSelector
|
with RoundRobinSelector
|
||||||
with BasicNoBackoffFilter
|
with BasicNoBackoffFilter
|
||||||
{
|
{
|
||||||
def factory = actorOf(new Actor {
|
def factory = actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case _ =>
|
case _ =>
|
||||||
delegates put(self.uuid.toString, "")
|
delegates put(self.uuid.toString, "")
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
def limit = 1
|
def limit = 1
|
||||||
def selectionCount = 2
|
def selectionCount = 2
|
||||||
def rampupRate = 0.1
|
def rampupRate = 0.1
|
||||||
def partialFill = true
|
def partialFill = true
|
||||||
def instance = factory
|
def instance = factory
|
||||||
def receive = _route
|
def receive = _route
|
||||||
}
|
}
|
||||||
|
|
||||||
val pool1 = actorOf(new TestPool1).start
|
val pool1 = actorOf(new TestPool1).start
|
||||||
pool1 ! "a"
|
pool1 ! "a"
|
||||||
pool1 ! "b"
|
pool1 ! "b"
|
||||||
var done = latch.await(1,TimeUnit.SECONDS)
|
var done = latch.await(1,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
delegates.size must be (1)
|
delegates.size must be (1)
|
||||||
pool1 stop
|
pool1 stop
|
||||||
|
|
||||||
class TestPool2 extends Actor with DefaultActorPool
|
class TestPool2 extends Actor with DefaultActorPool
|
||||||
with FixedCapacityStrategy
|
with FixedCapacityStrategy
|
||||||
with RoundRobinSelector
|
with RoundRobinSelector
|
||||||
with BasicNoBackoffFilter
|
with BasicNoBackoffFilter
|
||||||
{
|
{
|
||||||
def factory = actorOf(new Actor {
|
def factory = actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case _ =>
|
case _ =>
|
||||||
delegates put(self.uuid.toString, "")
|
delegates put(self.uuid.toString, "")
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
def limit = 2
|
def limit = 2
|
||||||
def selectionCount = 2
|
def selectionCount = 2
|
||||||
def rampupRate = 0.1
|
def rampupRate = 0.1
|
||||||
def partialFill = false
|
def partialFill = false
|
||||||
def instance = factory
|
def instance = factory
|
||||||
def receive = _route
|
def receive = _route
|
||||||
}
|
}
|
||||||
|
|
||||||
latch = new CountDownLatch(2)
|
latch = new CountDownLatch(2)
|
||||||
delegates clear
|
delegates clear
|
||||||
|
|
||||||
val pool2 = actorOf(new TestPool2).start
|
val pool2 = actorOf(new TestPool2).start
|
||||||
pool2 ! "a"
|
pool2 ! "a"
|
||||||
pool2 ! "b"
|
pool2 ! "b"
|
||||||
done = latch.await(1,TimeUnit.SECONDS)
|
done = latch.await(1,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
delegates.size must be (2)
|
delegates.size must be (2)
|
||||||
pool2 stop
|
pool2 stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Actor Pool Filter Tests
|
// Actor Pool Filter Tests
|
||||||
|
|
||||||
//
|
//
|
||||||
// reuse previous test to max pool then observe filter reducing capacity over time
|
// reuse previous test to max pool then observe filter reducing capacity over time
|
||||||
//
|
//
|
||||||
@Test def testBoundedCapacityActorPoolWithBackoffFilter = {
|
@Test def testBoundedCapacityActorPoolWithBackoffFilter = {
|
||||||
|
|
||||||
var latch = new CountDownLatch(10)
|
var latch = new CountDownLatch(10)
|
||||||
class TestPool extends Actor with DefaultActorPool
|
class TestPool extends Actor with DefaultActorPool
|
||||||
with BoundedCapacityStrategy
|
with BoundedCapacityStrategy
|
||||||
with MailboxPressureCapacitor
|
with MailboxPressureCapacitor
|
||||||
with SmallestMailboxSelector
|
with SmallestMailboxSelector
|
||||||
with Filter
|
with Filter
|
||||||
with RunningMeanBackoff
|
with RunningMeanBackoff
|
||||||
with BasicRampup
|
with BasicRampup
|
||||||
{
|
{
|
||||||
def factory = actorOf(new Actor {
|
def factory = actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case n:Int =>
|
case n:Int =>
|
||||||
Thread.sleep(n)
|
Thread.sleep(n)
|
||||||
latch.countDown
|
latch.countDown
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
def lowerBound = 1
|
def lowerBound = 1
|
||||||
def upperBound = 5
|
def upperBound = 5
|
||||||
def pressureThreshold = 1
|
def pressureThreshold = 1
|
||||||
def partialFill = true
|
def partialFill = true
|
||||||
def selectionCount = 1
|
def selectionCount = 1
|
||||||
def rampupRate = 0.1
|
def rampupRate = 0.1
|
||||||
def backoffRate = 0.50
|
def backoffRate = 0.50
|
||||||
def backoffThreshold = 0.50
|
def backoffThreshold = 0.50
|
||||||
def instance = factory
|
def instance = factory
|
||||||
def receive = _route
|
def receive = _route
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// put some pressure on the pool
|
// put some pressure on the pool
|
||||||
//
|
//
|
||||||
val pool = actorOf(new TestPool).start
|
val pool = actorOf(new TestPool).start
|
||||||
for (m <- 0 to 10) pool ! 250
|
for (m <- 0 to 10) pool ! 250
|
||||||
Thread.sleep(5)
|
Thread.sleep(5)
|
||||||
val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size
|
val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size
|
||||||
z must be >= (2)
|
z must be >= (2)
|
||||||
var done = latch.await(10,TimeUnit.SECONDS)
|
var done = latch.await(10,TimeUnit.SECONDS)
|
||||||
done must be (true)
|
done must be (true)
|
||||||
|
|
||||||
|
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
for (m <- 0 to 3) {
|
for (m <- 0 to 3) {
|
||||||
pool ! 1
|
pool ! 1
|
||||||
Thread.sleep(500)
|
Thread.sleep(500)
|
||||||
}
|
}
|
||||||
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z)
|
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z)
|
||||||
|
|
||||||
pool stop
|
pool stop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue