From 6e1b7958fe997ec5ed337ed2f5f40a866f221f7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 28 Feb 2011 22:55:02 +0100 Subject: [PATCH] tabs to spaces --- .../src/main/scala/akka/actor/FSM.scala | 0 .../src/main/scala/akka/routing/Pool.scala | 322 +++++----- .../test/scala/akka/routing/RoutingSpec.scala | 572 +++++++++--------- 3 files changed, 447 insertions(+), 447 deletions(-) mode change 100755 => 100644 akka-actor/src/main/scala/akka/actor/FSM.scala diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala old mode 100755 new mode 100644 diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 1638b0947c..3e326e8c92 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -14,8 +14,8 @@ import akka.actor. {Actor, ActorRef} * * Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message. * Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types. - * The first determines the size itself - either fixed or bounded. - * The second determines how to adjust of the pool according to some internal pressure characteristic. + * The first determines the size itself - either fixed or bounded. + * The second determines how to adjust of the pool according to some internal pressure characteristic. * Filters - A filter can be used to refine the raw pressure value returned from a capacitor. * * It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say @@ -27,8 +27,8 @@ import akka.actor. {Actor, ActorRef} object ActorPool { - case object Stat - case class Stats(size:Int) + case object Stat + case class Stats(size:Int) } /** @@ -36,91 +36,91 @@ object ActorPool */ trait ActorPool { - def instance():ActorRef - def capacity(delegates:Seq[ActorRef]):Int - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] + def instance():ActorRef + def capacity(delegates:Seq[ActorRef]):Int + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] } /** * A default implementation of a pool, on each message to route, - * - checks the current capacity and adjusts accordingly if needed - * - routes the incoming message to a selection set of delegate actors + * - checks the current capacity and adjusts accordingly if needed + * - routes the incoming message to a selection set of delegate actors */ trait DefaultActorPool extends ActorPool { - this: Actor => - - import ActorPool._ - import collection.mutable.LinkedList - import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached - - - protected var _delegates = LinkedList[ActorRef]() - private var _lastCapacityChange = 0 - private var _lastSelectorCount = 0 - - - override def postStop = _delegates foreach {_ stop} - - protected def _route:Receive = - { - // - // for testing... - // - case Stat => - self reply_? Stats(_delegates length) - - case max:MaximumNumberOfRestartsWithinTimeRangeReached => - _delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)} + this: Actor => + + import ActorPool._ + import collection.mutable.LinkedList + import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached + + + protected var _delegates = LinkedList[ActorRef]() + private var _lastCapacityChange = 0 + private var _lastSelectorCount = 0 + + + override def postStop = _delegates foreach {_ stop} + + protected def _route:Receive = + { + // + // for testing... + // + case Stat => + self reply_? Stats(_delegates length) + + case max:MaximumNumberOfRestartsWithinTimeRangeReached => + _delegates = _delegates filter {delegate => (delegate.uuid != max.victim.uuid)} - case msg => - _capacity - _select foreach {delegate => - self.senderFuture match { - case None => delegate ! msg - case Some(future) => - Actor.spawn { - try { - future completeWithResult (delegate !! msg).getOrElse(None) - } catch { - case ex => future completeWithException ex - } - } - } - } - } - - private def _capacity = - { - _lastCapacityChange = capacity(_delegates) - if (_lastCapacityChange > 0) { - _delegates ++= { - for (i <- 0 until _lastCapacityChange) yield { - val delegate = instance() - self startLink delegate - delegate - } - } - } - else if (_lastCapacityChange < 0) { - val s = _delegates splitAt(_delegates.length + _lastCapacityChange) - s._2 foreach {_ stop} - _delegates = s._1 - } - } - - private def _select = - { - val s = select(_delegates) - _lastSelectorCount = s._2 - s._1 - } + case msg => + _capacity + _select foreach {delegate => + self.senderFuture match { + case None => delegate ! msg + case Some(future) => + Actor.spawn { + try { + future completeWithResult (delegate !! msg).getOrElse(None) + } catch { + case ex => future completeWithException ex + } + } + } + } + } + + private def _capacity = + { + _lastCapacityChange = capacity(_delegates) + if (_lastCapacityChange > 0) { + _delegates ++= { + for (i <- 0 until _lastCapacityChange) yield { + val delegate = instance() + self startLink delegate + delegate + } + } + } + else if (_lastCapacityChange < 0) { + val s = _delegates splitAt(_delegates.length + _lastCapacityChange) + s._2 foreach {_ stop} + _delegates = s._1 + } + } + + private def _select = + { + val s = select(_delegates) + _lastSelectorCount = s._2 + s._1 + } } /** * Selectors - * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool + * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool **/ /** @@ -128,24 +128,24 @@ trait DefaultActorPool extends ActorPool */ trait SmallestMailboxSelector { - def selectionCount:Int - def partialFill:Boolean - - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = - { - var set:Seq[ActorRef] = Nil - var take = { - if (partialFill) math.min(selectionCount, delegates.length) - else selectionCount - } - - while (take > 0) { - set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set - take -= set.size - } + def selectionCount:Int + def partialFill:Boolean + + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = + { + var set:Seq[ActorRef] = Nil + var take = { + if (partialFill) math.min(selectionCount, delegates.length) + else selectionCount + } + + while (take > 0) { + set = delegates.sortWith((a,b) => a.mailboxSize < b.mailboxSize).take(take) ++ set + take -= set.size + } - (set.iterator, set.size) - } + (set.iterator, set.size) + } } /** @@ -153,33 +153,33 @@ trait SmallestMailboxSelector */ trait RoundRobinSelector { - private var _last:Int = -1; - - def selectionCount:Int - def partialFill:Boolean - - def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = - { - val length = delegates.length - val take = { - if (partialFill) math.min(selectionCount, length) - else selectionCount - } - - var set = for (i <- 0 to take) yield { - _last += 1 - if (_last >= length) _last = 0 - delegates(_last) - } - - (set.iterator, set.size) - } + private var _last:Int = -1; + + def selectionCount:Int + def partialFill:Boolean + + def select(delegates:Seq[ActorRef]):Tuple2[Iterator[ActorRef], Int] = + { + val length = delegates.length + val take = { + if (partialFill) math.min(selectionCount, length) + else selectionCount + } + + var set = for (i <- 0 to take) yield { + _last += 1 + if (_last >= length) _last = 0 + delegates(_last) + } + + (set.iterator, set.size) + } } /** * Capacitors - * These traits define how to alter the size of the pool + * These traits define how to alter the size of the pool */ /** @@ -187,14 +187,14 @@ trait RoundRobinSelector */ trait FixedSizeCapacitor { - def limit:Int - - def capacity(delegates:Seq[ActorRef]):Int = - { - val d = limit - delegates.size - if (d>0) d - else 0 - } + def limit:Int + + def capacity(delegates:Seq[ActorRef]):Int = + { + val d = limit - delegates.size + if (d>0) d + else 0 + } } /** @@ -202,22 +202,22 @@ trait FixedSizeCapacitor */ trait BoundedCapacitor { - def lowerBound:Int - def upperBound:Int - - def capacity(delegates:Seq[ActorRef]):Int = - { - val current = delegates length - var delta = _eval(delegates) - val proposed = current + delta - - if (proposed < lowerBound) delta += (lowerBound - proposed) - else if (proposed > upperBound) delta -= (proposed - upperBound) - - delta - } - - protected def _eval(delegates:Seq[ActorRef]):Int + def lowerBound:Int + def upperBound:Int + + def capacity(delegates:Seq[ActorRef]):Int = + { + val current = delegates length + var delta = _eval(delegates) + val proposed = current + delta + + if (proposed < lowerBound) delta += (lowerBound - proposed) + else if (proposed > upperBound) delta -= (proposed - upperBound) + + delta + } + + protected def _eval(delegates:Seq[ActorRef]):Int } /** @@ -225,14 +225,14 @@ trait BoundedCapacitor */ trait MailboxPressureCapacitor { - def pressureThreshold:Int - - def pressure(delegates:Seq[ActorRef]):Int = - { - var n = 0; - delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1} - n - } + def pressureThreshold:Int + + def pressure(delegates:Seq[ActorRef]):Int = + { + var n = 0; + delegates foreach {d => if (d.mailboxSize > pressureThreshold) n+=1} + n + } } /** @@ -240,12 +240,12 @@ trait MailboxPressureCapacitor */ trait ActiveFuturesPressureCapacitor { - def pressure(delegates:Seq[ActorRef]):Int = - { - var n = 0; - delegates foreach {d => if (d.senderFuture.isDefined) n+=1} - n - } + def pressure(delegates:Seq[ActorRef]):Int = + { + var n = 0; + delegates foreach {d => if (d.senderFuture.isDefined) n+=1} + n + } } @@ -253,12 +253,12 @@ trait ActiveFuturesPressureCapacitor */ trait CapacityStrategy { - import ActorPool._ - - def pressure(delegates:Seq[ActorRef]):Int - def filter(pressure:Int, capacity:Int):Int - - protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size) + import ActorPool._ + + def pressure(delegates:Seq[ActorRef]):Int + def filter(pressure:Int, capacity:Int):Int + + protected def _eval(delegates:Seq[ActorRef]):Int = filter(pressure(delegates), delegates.size) } trait FixedCapacityStrategy extends FixedSizeCapacitor @@ -284,7 +284,7 @@ trait Filter // are updated consistently. ramping up is always + and backing off // is always - and each should return 0 otherwise... // - rampup (pressure, capacity) + backoff (pressure, capacity) + rampup (pressure, capacity) + backoff (pressure, capacity) } } @@ -348,7 +348,7 @@ trait RunningMeanBackoff _capacity += capacity if (capacity > 0 && pressure/capacity < backoffThreshold && - _capacity > 0 && _pressure/_capacity < backoffThreshold) { + _capacity > 0 && _pressure/_capacity < backoffThreshold) { math.floor(-1.0 * backoffRate * (capacity-pressure)).toInt } else diff --git a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala index 978d0c3da4..bce649b43b 100644 --- a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala @@ -180,315 +180,315 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers for(a <- List(t1,t2,d1,d2)) a.stop } - // Actor Pool Capacity Tests - - // - // make sure the pool is of the fixed, expected capacity - // - @Test def testFixedCapacityActorPool = { + // Actor Pool Capacity Tests + + // + // make sure the pool is of the fixed, expected capacity + // + @Test def testFixedCapacityActorPool = { - val latch = new CountDownLatch(2) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with FixedCapacityStrategy - with SmallestMailboxSelector - { - def factory = actorOf(new Actor { - def receive = { - case _ => - counter.incrementAndGet - latch.countDown - } - }) - - def limit = 2 - def selectionCount = 1 - def partialFill = true - def instance = factory - def receive = _route - } - - val pool = actorOf(new TestPool).start - pool ! "a" - pool ! "b" - val done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - counter.get must be (2) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - pool stop - } - - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of blocking pooled actors - // - @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { + val latch = new CountDownLatch(2) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with FixedCapacityStrategy + with SmallestMailboxSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + counter.incrementAndGet + latch.countDown + } + }) + + def limit = 2 + def selectionCount = 1 + def partialFill = true + def instance = factory + def receive = _route + } + + val pool = actorOf(new TestPool).start + pool ! "a" + pool ! "b" + val done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + counter.get must be (2) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + pool stop + } + + // + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of blocking pooled actors + // + @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with ActiveFuturesPressureCapacitor - with SmallestMailboxSelector + var latch = new CountDownLatch(3) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with ActiveFuturesPressureCapacitor + with SmallestMailboxSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet - latch.countDown - } - }) + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + counter.incrementAndGet + latch.countDown + } + }) - def lowerBound = 2 - def upperBound = 4 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } + def lowerBound = 2 + def upperBound = 4 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + } - // - // first message should create the minimum number of delgates - // - val pool = actorOf(new TestPool).start - pool ! 1 - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + // + // first message should create the minimum number of delgates + // + val pool = actorOf(new TestPool).start + pool ! 1 + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool !!! t - Thread.sleep(50) - } - } - - // - // 2 more should go thru w/out triggering more - // - loops = 2 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + var loops = 0 + def loop(t:Int) = { + latch = new CountDownLatch(loops) + counter.set(0) + for (m <- 0 until loops) { + pool !!! t + Thread.sleep(50) + } + } + + // + // 2 more should go thru w/out triggering more + // + loops = 2 + loop(500) + var done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - // - // a whole bunch should max it out - // - loops = 10 - loop(500) + // + // a whole bunch should max it out + // + loops = 10 + loop(500) - done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) - - pool stop - } + done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) + + pool stop + } - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of messages in the delegate mailboxes - // - @Test def testBoundedCapacityActorPoolWithMailboxPressure = { + // + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of messages in the delegate mailboxes + // + @Test def testBoundedCapacityActorPoolWithMailboxPressure = { - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector + var latch = new CountDownLatch(3) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet - latch.countDown - } - }) + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + counter.incrementAndGet + latch.countDown + } + }) - def lowerBound = 2 - def upperBound = 4 - def pressureThreshold = 3 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } + def lowerBound = 2 + def upperBound = 4 + def pressureThreshold = 3 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + } - val pool = actorOf(new TestPool).start + val pool = actorOf(new TestPool).start - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool ! t - } - } - - // - // send a few messages and observe pool at its lower bound - // - loops = 3 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + var loops = 0 + def loop(t:Int) = { + latch = new CountDownLatch(loops) + counter.set(0) + for (m <- 0 until loops) { + pool ! t + } + } + + // + // send a few messages and observe pool at its lower bound + // + loops = 3 + loop(500) + var done = latch.await(5,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - // - // send a bunch over the theshold and observe an increment - // - loops = 15 - loop(500) + // + // send a bunch over the theshold and observe an increment + // + loops = 15 + loop(500) - done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) - - pool stop - } - - // Actor Pool Selector Tests - - @Test def testRoundRobinSelector = { + done = latch.await(10,TimeUnit.SECONDS) + done must be (true) + counter.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) + + pool stop + } + + // Actor Pool Selector Tests + + @Test def testRoundRobinSelector = { - var latch = new CountDownLatch(2) - val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - class TestPool1 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector + var latch = new CountDownLatch(2) + val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] + + class TestPool1 extends Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown - } - }) - - def limit = 1 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = true - def instance = factory - def receive = _route - } - - val pool1 = actorOf(new TestPool1).start - pool1 ! "a" - pool1 ! "b" - var done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (1) - pool1 stop - - class TestPool2 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch.countDown + } + }) + + def limit = 1 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = true + def instance = factory + def receive = _route + } + + val pool1 = actorOf(new TestPool1).start + pool1 ! "a" + pool1 ! "b" + var done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + delegates.size must be (1) + pool1 stop + + class TestPool2 extends Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown - } - }) - - def limit = 2 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = false - def instance = factory - def receive = _route - } + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch.countDown + } + }) + + def limit = 2 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = false + def instance = factory + def receive = _route + } - latch = new CountDownLatch(2) - delegates clear - - val pool2 = actorOf(new TestPool2).start - pool2 ! "a" - pool2 ! "b" - done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (2) - pool2 stop - } - - // Actor Pool Filter Tests - - // - // reuse previous test to max pool then observe filter reducing capacity over time - // - @Test def testBoundedCapacityActorPoolWithBackoffFilter = { + latch = new CountDownLatch(2) + delegates clear + + val pool2 = actorOf(new TestPool2).start + pool2 ! "a" + pool2 ! "b" + done = latch.await(1,TimeUnit.SECONDS) + done must be (true) + delegates.size must be (2) + pool2 stop + } + + // Actor Pool Filter Tests + + // + // reuse previous test to max pool then observe filter reducing capacity over time + // + @Test def testBoundedCapacityActorPoolWithBackoffFilter = { - var latch = new CountDownLatch(10) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with Filter - with RunningMeanBackoff - with BasicRampup - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - latch.countDown - } - }) + var latch = new CountDownLatch(10) + class TestPool extends Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with Filter + with RunningMeanBackoff + with BasicRampup + { + def factory = actorOf(new Actor { + def receive = { + case n:Int => + Thread.sleep(n) + latch.countDown + } + }) - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = factory - def receive = _route - } + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = factory + def receive = _route + } - // - // put some pressure on the pool - // - val pool = actorOf(new TestPool).start - for (m <- 0 to 10) pool ! 250 - Thread.sleep(5) - val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size - z must be >= (2) - var done = latch.await(10,TimeUnit.SECONDS) - done must be (true) + // + // put some pressure on the pool + // + val pool = actorOf(new TestPool).start + for (m <- 0 to 10) pool ! 250 + Thread.sleep(5) + val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size + z must be >= (2) + var done = latch.await(10,TimeUnit.SECONDS) + done must be (true) - - // - // - // - for (m <- 0 to 3) { - pool ! 1 - Thread.sleep(500) - } - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) - - pool stop - } - - + + // + // + // + for (m <- 0 to 3) { + pool ! 1 + Thread.sleep(500) + } + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) + + pool stop + } + + }