From f7e215c1bebb28c0e684909c40dd9e2c7c7d7dcb Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 16 Mar 2011 13:20:00 +0100 Subject: [PATCH] Refactoring, reformatting and fixes to ActorPool, including ticket 705 --- .../src/main/scala/akka/routing/Pool.scala | 34 +- .../test/scala/akka/routing/RoutingSpec.scala | 541 ++++++++++-------- 2 files changed, 307 insertions(+), 268 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index fc42e6f50b..b60054a531 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -4,7 +4,7 @@ package akka.routing -import akka.actor.{Actor, ActorRef, EventHandler} +import akka.actor.{Actor, ActorRef, EventHandler, PoisonPill} import java.util.concurrent.TimeUnit /** @@ -52,11 +52,10 @@ trait DefaultActorPool extends ActorPool { this: Actor => import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached - protected var _delegates = LinkedList[ActorRef]() + protected var _delegates = Vector[ActorRef]() private var _lastCapacityChange = 0 private var _lastSelectorCount = 0 - override def postStop = _delegates foreach {_ stop} protected def _route(): Receive = { @@ -70,24 +69,29 @@ trait DefaultActorPool extends ActorPool { this: Actor => _select() foreach { _ forward msg } } - private def _capacity() = { - _lastCapacityChange = capacity(_delegates) - if (_lastCapacityChange > 0) { - _delegates ++= { - for (i <- 0 until _lastCapacityChange) yield { + private def _capacity() { + val requestedCapacity = capacity(_delegates) + val newDelegates = requestedCapacity match { + case qty if qty > 0 => + _delegates ++ { for (i <- 0 until requestedCapacity) yield { val delegate = instance() self startLink delegate delegate } } + + case qty if qty < 0 => + _delegates.splitAt(_delegates.length + requestedCapacity) match { + case (keep, abandon) => + abandon foreach { _ ! PoisonPill } + keep + } + + case _ => _delegates //No change } - else if (_lastCapacityChange < 0) { - _delegates splitAt(_delegates.length + _lastCapacityChange) match { - case (keep, abandon) => - abandon foreach { _.stop } - _delegates = keep - } - } + + _lastCapacityChange = requestedCapacity + _delegates = newDelegates } private def _select() = select(_delegates) match { diff --git a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala index 1be34a41bc..09e618e24c 100644 --- a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala @@ -226,277 +226,312 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers pool stop } + + @Test def testTicket705 = { + + val actorPool = actorOf(new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with BasicFilter { + //with BasicNoBackoffFilter { + def lowerBound = 2 + def upperBound = 20 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + def receive = { + case req: String => { + Thread.sleep(10L) + self.reply_?("Response") + } + } + }) + }).start + + try { + (for(count <- 1 to 500) yield actorPool.!!![String]("Test", 20000)) foreach { + _.await.resultOrException.get must be ("Response") + } + } finally { + actorPool.stop + } + } - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of blocking pooled actors - // - @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { + // + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of blocking pooled actors + // + @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with ActiveFuturesPressureCapacitor - with SmallestMailboxSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet - latch.countDown - } - }) - - def lowerBound = 2 - def upperBound = 4 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } - - // - // first message should create the minimum number of delgates - // - val pool = actorOf(new TestPool).start - pool ! 1 - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool !!! t - Thread.sleep(50) - } - } - - // - // 2 more should go thru w/out triggering more - // - loops = 2 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - // - // a whole bunch should max it out - // - loops = 10 - loop(500) - - done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) - - pool stop + var latch = new CountDownLatch(3) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with ActiveFuturesPressureCapacitor + with SmallestMailboxSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + counter.incrementAndGet + latch.countDown } + }) - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of messages in the delegate mailboxes - // - @Test def testBoundedCapacityActorPoolWithMailboxPressure = { + def lowerBound = 2 + def upperBound = 4 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + } - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet - latch.countDown - } - }) + // + // first message should create the minimum number of delgates + // + val pool = actorOf(new TestPool).start + pool ! 1 + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - def lowerBound = 2 - def upperBound = 4 - def pressureThreshold = 3 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } + var loops = 0 + def loop(t:Int) = { + latch = new CountDownLatch(loops) + counter.set(0) + for (m <- 0 until loops) { + pool !!! t + Thread.sleep(50) + } + } - val pool = actorOf(new TestPool).start + // + // 2 more should go thru w/out triggering more + // + loops = 2 + loop(500) + var done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool ! t - } - } - - // - // send a few messages and observe pool at its lower bound - // - loops = 3 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + // + // a whole bunch should max it out + // + loops = 10 + loop(500) - // - // send a bunch over the theshold and observe an increment - // - loops = 15 - loop(500) + done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) - done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) - - pool stop + pool stop + } + + // + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of messages in the delegate mailboxes + // + @Test def testBoundedCapacityActorPoolWithMailboxPressure = { + + var latch = new CountDownLatch(3) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + counter.incrementAndGet + latch.countDown } - - // Actor Pool Selector Tests - - @Test def testRoundRobinSelector = { + }) - var latch = new CountDownLatch(2) - val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - class TestPool1 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown - } - }) - - def limit = 1 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = true - def instance = factory - def receive = _route - } - - val pool1 = actorOf(new TestPool1).start - pool1 ! "a" - pool1 ! "b" - var done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (1) - pool1 stop - - class TestPool2 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown - } - }) - - def limit = 2 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = false - def instance = factory - def receive = _route - } + def lowerBound = 2 + def upperBound = 4 + def pressureThreshold = 3 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + } - latch = new CountDownLatch(2) - delegates clear - - val pool2 = actorOf(new TestPool2).start - pool2 ! "a" - pool2 ! "b" - done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (2) - pool2 stop + val pool = actorOf(new TestPool).start + + var loops = 0 + def loop(t:Int) = { + latch = new CountDownLatch(loops) + counter.set(0) + for (m <- 0 until loops) { + pool ! t } + } + + // + // send a few messages and observe pool at its lower bound + // + loops = 3 + loop(500) + var done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + // + // send a bunch over the theshold and observe an increment + // + loops = 15 + loop(500) + + done = latch.await(10,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) + + pool stop + } - // Actor Pool Filter Tests - - // - // reuse previous test to max pool then observe filter reducing capacity over time - // - @Test def testBoundedCapacityActorPoolWithBackoffFilter = { + // Actor Pool Selector Tests - var latch = new CountDownLatch(10) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with Filter - with RunningMeanBackoff - with BasicRampup - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - latch.countDown - } - }) + @Test def testRoundRobinSelector = { - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = factory - def receive = _route - } + var latch = new CountDownLatch(2) + val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - // - // put some pressure on the pool - // - val pool = actorOf(new TestPool).start - for (m <- 0 to 10) pool ! 250 - Thread.sleep(5) - val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size - z must be >= (2) - var done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - - - // - // - // - for (m <- 0 to 3) { - pool ! 1 - Thread.sleep(500) - } - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) - - pool stop + class TestPool1 extends Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch.countDown } + }) + + def limit = 1 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = true + def instance = factory + def receive = _route + } + + val pool1 = actorOf(new TestPool1).start + pool1 ! "a" + pool1 ! "b" + var done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + delegates.size must be (1) + pool1 stop + + class TestPool2 extends Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch.countDown + } + }) + + def limit = 2 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = false + def instance = factory + def receive = _route + } + + latch = new CountDownLatch(2) + delegates clear + + val pool2 = actorOf(new TestPool2).start + pool2 ! "a" + pool2 ! "b" + done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + delegates.size must be (2) + pool2 stop + } - + // Actor Pool Filter Tests + + // + // reuse previous test to max pool then observe filter reducing capacity over time + // + @Test def testBoundedCapacityActorPoolWithBackoffFilter = { + + var latch = new CountDownLatch(10) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with Filter + with RunningMeanBackoff + with BasicRampup + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + latch.countDown + } + }) + + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = factory + def receive = _route + } + + + // + // put some pressure on the pool + // + val pool = actorOf(new TestPool).start + for (m <- 0 to 10) pool ! 250 + Thread.sleep(5) + val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size + z must be >= (2) + var done = latch.await(10,TimeUnit.SECONDS) + done must be (true) + + + // + // + // + for (m <- 0 to 3) { + pool ! 1 + Thread.sleep(500) + } + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) + + pool stop + } }