Refactoring, reformatting and fixes to ActorPool, including ticket 705

This commit is contained in:
Viktor Klang 2011-03-16 13:20:00 +01:00
parent fadd30e96b
commit f7e215c1be
2 changed files with 307 additions and 268 deletions

View file

@ -4,7 +4,7 @@
package akka.routing
import akka.actor.{Actor, ActorRef, EventHandler}
import akka.actor.{Actor, ActorRef, EventHandler, PoisonPill}
import java.util.concurrent.TimeUnit
/**
@ -52,11 +52,10 @@ trait DefaultActorPool extends ActorPool { this: Actor =>
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
protected var _delegates = LinkedList[ActorRef]()
protected var _delegates = Vector[ActorRef]()
private var _lastCapacityChange = 0
private var _lastSelectorCount = 0
override def postStop = _delegates foreach {_ stop}
protected def _route(): Receive = {
@ -70,24 +69,29 @@ trait DefaultActorPool extends ActorPool { this: Actor =>
_select() foreach { _ forward msg }
}
private def _capacity() = {
_lastCapacityChange = capacity(_delegates)
if (_lastCapacityChange > 0) {
_delegates ++= {
for (i <- 0 until _lastCapacityChange) yield {
private def _capacity() {
val requestedCapacity = capacity(_delegates)
val newDelegates = requestedCapacity match {
case qty if qty > 0 =>
_delegates ++ { for (i <- 0 until requestedCapacity) yield {
val delegate = instance()
self startLink delegate
delegate
}
}
case qty if qty < 0 =>
_delegates.splitAt(_delegates.length + requestedCapacity) match {
case (keep, abandon) =>
abandon foreach { _ ! PoisonPill }
keep
}
case _ => _delegates //No change
}
else if (_lastCapacityChange < 0) {
_delegates splitAt(_delegates.length + _lastCapacityChange) match {
case (keep, abandon) =>
abandon foreach { _.stop }
_delegates = keep
}
}
_lastCapacityChange = requestedCapacity
_delegates = newDelegates
}
private def _select() = select(_delegates) match {

View file

@ -226,277 +226,312 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
pool stop
}
@Test def testTicket705 = {
val actorPool = actorOf(new Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
with BasicFilter {
//with BasicNoBackoffFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
def receive = {
case req: String => {
Thread.sleep(10L)
self.reply_?("Response")
}
}
})
}).start
try {
(for(count <- 1 to 500) yield actorPool.!!![String]("Test", 20000)) foreach {
_.await.resultOrException.get must be ("Response")
}
} finally {
actorPool.stop
}
}
//
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
//
@Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = {
//
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
//
@Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = {
var latch = new CountDownLatch(3)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
with BoundedCapacityStrategy
with ActiveFuturesPressureCapacitor
with SmallestMailboxSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
counter.incrementAndGet
latch.countDown
}
})
def lowerBound = 2
def upperBound = 4
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}
//
// first message should create the minimum number of delgates
//
val pool = actorOf(new TestPool).start
pool ! 1
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
var loops = 0
def loop(t:Int) = {
latch = new CountDownLatch(loops)
counter.set(0)
for (m <- 0 until loops) {
pool !!! t
Thread.sleep(50)
}
}
//
// 2 more should go thru w/out triggering more
//
loops = 2
loop(500)
var done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
//
// a whole bunch should max it out
//
loops = 10
loop(500)
done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4)
pool stop
var latch = new CountDownLatch(3)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
with BoundedCapacityStrategy
with ActiveFuturesPressureCapacitor
with SmallestMailboxSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
counter.incrementAndGet
latch.countDown
}
})
//
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of messages in the delegate mailboxes
//
@Test def testBoundedCapacityActorPoolWithMailboxPressure = {
def lowerBound = 2
def upperBound = 4
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}
var latch = new CountDownLatch(3)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
counter.incrementAndGet
latch.countDown
}
})
//
// first message should create the minimum number of delgates
//
val pool = actorOf(new TestPool).start
pool ! 1
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
def lowerBound = 2
def upperBound = 4
def pressureThreshold = 3
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}
var loops = 0
def loop(t:Int) = {
latch = new CountDownLatch(loops)
counter.set(0)
for (m <- 0 until loops) {
pool !!! t
Thread.sleep(50)
}
}
val pool = actorOf(new TestPool).start
//
// 2 more should go thru w/out triggering more
//
loops = 2
loop(500)
var done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
var loops = 0
def loop(t:Int) = {
latch = new CountDownLatch(loops)
counter.set(0)
for (m <- 0 until loops) {
pool ! t
}
}
//
// send a few messages and observe pool at its lower bound
//
loops = 3
loop(500)
var done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
//
// a whole bunch should max it out
//
loops = 10
loop(500)
//
// send a bunch over the theshold and observe an increment
//
loops = 15
loop(500)
done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4)
done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3)
pool stop
pool stop
}
//
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of messages in the delegate mailboxes
//
@Test def testBoundedCapacityActorPoolWithMailboxPressure = {
var latch = new CountDownLatch(3)
val counter = new AtomicInteger(0)
class TestPool extends Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
counter.incrementAndGet
latch.countDown
}
// Actor Pool Selector Tests
@Test def testRoundRobinSelector = {
})
var latch = new CountDownLatch(2)
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
class TestPool1 extends Actor with DefaultActorPool
with FixedCapacityStrategy
with RoundRobinSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case _ =>
delegates put(self.uuid.toString, "")
latch.countDown
}
})
def limit = 1
def selectionCount = 2
def rampupRate = 0.1
def partialFill = true
def instance = factory
def receive = _route
}
val pool1 = actorOf(new TestPool1).start
pool1 ! "a"
pool1 ! "b"
var done = latch.await(1,TimeUnit.SECONDS)
done must be (true)
delegates.size must be (1)
pool1 stop
class TestPool2 extends Actor with DefaultActorPool
with FixedCapacityStrategy
with RoundRobinSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case _ =>
delegates put(self.uuid.toString, "")
latch.countDown
}
})
def limit = 2
def selectionCount = 2
def rampupRate = 0.1
def partialFill = false
def instance = factory
def receive = _route
}
def lowerBound = 2
def upperBound = 4
def pressureThreshold = 3
def rampupRate = 0.1
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
}
latch = new CountDownLatch(2)
delegates clear
val pool2 = actorOf(new TestPool2).start
pool2 ! "a"
pool2 ! "b"
done = latch.await(1,TimeUnit.SECONDS)
done must be (true)
delegates.size must be (2)
pool2 stop
val pool = actorOf(new TestPool).start
var loops = 0
def loop(t:Int) = {
latch = new CountDownLatch(loops)
counter.set(0)
for (m <- 0 until loops) {
pool ! t
}
}
//
// send a few messages and observe pool at its lower bound
//
loops = 3
loop(500)
var done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2)
//
// send a bunch over the theshold and observe an increment
//
loops = 15
loop(500)
done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
counter.get must be (loops)
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3)
pool stop
}
// Actor Pool Filter Tests
//
// reuse previous test to max pool then observe filter reducing capacity over time
//
@Test def testBoundedCapacityActorPoolWithBackoffFilter = {
// Actor Pool Selector Tests
var latch = new CountDownLatch(10)
class TestPool extends Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
with Filter
with RunningMeanBackoff
with BasicRampup
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
latch.countDown
}
})
@Test def testRoundRobinSelector = {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = factory
def receive = _route
}
var latch = new CountDownLatch(2)
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
//
// put some pressure on the pool
//
val pool = actorOf(new TestPool).start
for (m <- 0 to 10) pool ! 250
Thread.sleep(5)
val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size
z must be >= (2)
var done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
//
//
//
for (m <- 0 to 3) {
pool ! 1
Thread.sleep(500)
}
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z)
pool stop
class TestPool1 extends Actor with DefaultActorPool
with FixedCapacityStrategy
with RoundRobinSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case _ =>
delegates put(self.uuid.toString, "")
latch.countDown
}
})
def limit = 1
def selectionCount = 2
def rampupRate = 0.1
def partialFill = true
def instance = factory
def receive = _route
}
val pool1 = actorOf(new TestPool1).start
pool1 ! "a"
pool1 ! "b"
var done = latch.await(1,TimeUnit.SECONDS)
done must be (true)
delegates.size must be (1)
pool1 stop
class TestPool2 extends Actor with DefaultActorPool
with FixedCapacityStrategy
with RoundRobinSelector
with BasicNoBackoffFilter
{
def factory = actorOf(new Actor {
def receive = {
case _ =>
delegates put(self.uuid.toString, "")
latch.countDown
}
})
def limit = 2
def selectionCount = 2
def rampupRate = 0.1
def partialFill = false
def instance = factory
def receive = _route
}
latch = new CountDownLatch(2)
delegates clear
val pool2 = actorOf(new TestPool2).start
pool2 ! "a"
pool2 ! "b"
done = latch.await(1,TimeUnit.SECONDS)
done must be (true)
delegates.size must be (2)
pool2 stop
}
// Actor Pool Filter Tests
//
// reuse previous test to max pool then observe filter reducing capacity over time
//
@Test def testBoundedCapacityActorPoolWithBackoffFilter = {
var latch = new CountDownLatch(10)
class TestPool extends Actor with DefaultActorPool
with BoundedCapacityStrategy
with MailboxPressureCapacitor
with SmallestMailboxSelector
with Filter
with RunningMeanBackoff
with BasicRampup
{
def factory = actorOf(new Actor {
def receive = {
case n:Int =>
Thread.sleep(n)
latch.countDown
}
})
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = factory
def receive = _route
}
//
// put some pressure on the pool
//
val pool = actorOf(new TestPool).start
for (m <- 0 to 10) pool ! 250
Thread.sleep(5)
val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size
z must be >= (2)
var done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
//
//
//
for (m <- 0 to 3) {
pool ! 1
Thread.sleep(500)
}
(pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z)
pool stop
}
}