diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index d79bd0651e..96f93fc160 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -1,537 +1,532 @@ package akka.actor.routing +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testing._ +import akka.testing.Testing.{sleepFor, testMillis} +import akka.util.duration._ + import akka.actor.Actor import akka.actor.Actor._ - -import org.scalatest.Suite -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.matchers.MustMatchers -import org.junit.Test - -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{CountDownLatch, TimeUnit} import akka.routing._ -@RunWith(classOf[JUnitRunner]) -class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers { +import java.util.concurrent.atomic.AtomicInteger + + +class RoutingSpec extends WordSpec with MustMatchers { import Routing._ - @Test def testDispatcher = { - val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4") - val targetOk = new AtomicInteger(0) - val t1 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() + "Routing" must { - val t2 = actorOf( new Actor() { - def receive = { - case `testMsg3` => self.reply(11) - } - }).start() + "dispatch" in { + val Test1 = "test1" + val Test2 = "test2" + val Test3 = "test3" - val d = dispatcherActor { - case `testMsg1`|`testMsg2` => t1 - case `testMsg3` => t2 - }.start() - - val result = for { - a <- (d !! (testMsg1, 5000)).as[Int] - b <- (d !! (testMsg2, 5000)).as[Int] - c <- (d !! (testMsg3, 5000)).as[Int] - } yield a + b + c - - result.isDefined must be (true) - result.get must be(21) - - for(a <- List(t1,t2,d)) a.stop() - } - - @Test def testLogger = { - val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any] - val latch = new CountDownLatch(2) - val t1 = actorOf(new Actor { def receive = { case _ => } }).start() - val l = loggerActor(t1,(x) => { msgs.add(x); latch.countDown() }).start() - val foo : Any = "foo" - val bar : Any = "bar" - l ! foo - l ! bar - val done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - msgs must ( have size (2) and contain (foo) and contain (bar) ) - t1.stop() - l.stop() - } - - @Test def testSmallestMailboxFirstDispatcher = { - val t1ProcessedCount = new AtomicInteger(0) - val latch = new CountDownLatch(500) - val t1 = actorOf(new Actor { - def receive = { - case x => - Thread.sleep(50) // slow actor - t1ProcessedCount.incrementAndGet - latch.countDown() - } - }).start() - - val t2ProcessedCount = new AtomicInteger(0) - val t2 = actorOf(new Actor { - def receive = { - case x => t2ProcessedCount.incrementAndGet - latch.countDown() - } - }).start() - val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - for (i <- 1 to 500) d ! i - val done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - t1ProcessedCount.get must be < (t2ProcessedCount.get) // because t1 is much slower and thus has a bigger mailbox all the time - for(a <- List(t1,t2,d)) a.stop() - } - - @Test def testListener = { - val latch = new CountDownLatch(2) - val foreachListener = new CountDownLatch(2) - val num = new AtomicInteger(0) - val i = actorOf(new Actor with Listeners { - def receive = listenerManagement orElse { - case "foo" => gossip("bar") - } - }) - i.start() - - def newListener = actorOf(new Actor { - def receive = { - case "bar" => - num.incrementAndGet - latch.countDown() - case "foo" => foreachListener.countDown() - } - }).start() - - val a1 = newListener - val a2 = newListener - val a3 = newListener - - i ! Listen(a1) - i ! Listen(a2) - i ! Listen(a3) - i ! Deafen(a3) - i ! WithListeners(_ ! "foo") - i ! "foo" - - val done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - num.get must be (2) - val withListeners = foreachListener.await(5,TimeUnit.SECONDS) - withListeners must be (true) - for(a <- List(i,a1,a2,a3)) a.stop() - } - - @Test def testIsDefinedAt = { - import akka.actor.ActorRef - - val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4") - - val t1 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val t2 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val t3 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val t4 = actorOf( new Actor() { - def receive = { - case `testMsg1` => self.reply(3) - case `testMsg2` => self.reply(7) - } - } ).start() - - val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) - - t1.isDefinedAt(testMsg1) must be (true) - t1.isDefinedAt(testMsg3) must be (false) - t2.isDefinedAt(testMsg1) must be (true) - t2.isDefinedAt(testMsg3) must be (false) - d1.isDefinedAt(testMsg1) must be (true) - d1.isDefinedAt(testMsg3) must be (false) - d2.isDefinedAt(testMsg1) must be (true) - d2.isDefinedAt(testMsg3) must be (false) - - for(a <- List(t1,t2,d1,d2)) a.stop() - } - - // Actor Pool Capacity Tests - - // - // make sure the pool is of the fixed, expected capacity - // - @Test def testFixedCapacityActorPool = { - val latch = new CountDownLatch(2) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with FixedCapacityStrategy - with SmallestMailboxSelector - { - def factory = actorOf(new Actor { + val t1 = actorOf(new Actor { def receive = { - case _ => - counter.incrementAndGet - latch.countDown() - self reply_? "success" + case Test1 => self.reply(3) + case Test2 => self.reply(7) } - }) + }).start() - def limit = 2 - def selectionCount = 1 - def partialFill = true - def instance = factory - def receive = _route - } - - val successes = new CountDownLatch(2) - implicit val successCounterActor = Some(actorOf(new Actor { - def receive = { - case "success" => successes.countDown() - } - }).start()) - - val pool = actorOf(new TestPool).start() - pool ! "a" - pool ! "b" - - latch.await(1,TimeUnit.SECONDS) must be (true) - successes.await(1,TimeUnit.SECONDS) must be (true) - counter.get must be (2) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - pool stop - } - - @Test def testTicket705 = { - - val actorPool = actorOf(new Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with BasicFilter { - //with BasicNoBackoffFilter { - def lowerBound = 2 - def upperBound = 20 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - def receive = { - case req: String => { - Thread.sleep(10L) - self.reply_?("Response") - } - } - }) - }).start() - - try { - (for(count <- 1 to 500) yield actorPool.!!![String]("Test", 20000)) foreach { - _.await.resultOrException.get must be ("Response") - } - } finally { - actorPool.stop() - } - } - - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of blocking pooled actors - // - @Test def testBoundedCapacityActorPoolWithActiveFuturesPressure = { - - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with ActiveFuturesPressureCapacitor - with SmallestMailboxSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { + val t2 = actorOf( new Actor() { def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet + case Test3 => self.reply(11) + } + }).start() + + val d = dispatcherActor { + case Test1 | Test2 => t1 + case Test3 => t2 + }.start() + + val result = for { + a <- (d !! (Test1, testMillis(5 seconds))).as[Int] + b <- (d !! (Test2, testMillis(5 seconds))).as[Int] + c <- (d !! (Test3, testMillis(5 seconds))).as[Int] + } yield a + b + c + + result.isDefined must be (true) + result.get must be (21) + + for(a <- List(t1, t2, d)) a.stop() + } + + "have messages logged" in { + val msgs = new java.util.concurrent.ConcurrentSkipListSet[Any] + val latch = TestLatch(2) + + val actor = actorOf(new Actor { + def receive = { case _ => } + }).start() + + val logger = loggerActor(actor, x => { msgs.add(x); latch.countDown() }).start() + + val foo: Any = "foo" + val bar: Any = "bar" + + logger ! foo + logger ! bar + + latch.await + + msgs must ( have size (2) and contain (foo) and contain (bar) ) + + actor.stop() + logger.stop() + } + + "dispatch to smallest mailbox" in { + val t1Count = new AtomicInteger(0) + val t2Count = new AtomicInteger(0) + val latch = TestLatch(500) + + val t1 = actorOf(new Actor { + def receive = { + case x => + sleepFor(50 millis) // slow actor + t1Count.incrementAndGet latch.countDown() } - }) + }).start() - def lowerBound = 2 - def upperBound = 4 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - } - - // - // first message should create the minimum number of delgates - // - val pool = actorOf(new TestPool).start() - pool ! 1 - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool !!! t - Thread.sleep(50) - } - } - - // - // 2 more should go thru w/out triggering more - // - loops = 2 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - // - // a whole bunch should max it out - // - loops = 10 - loop(500) - - done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) - - pool stop - } - - // - // make sure the pool starts at the expected lower limit and grows to the upper as needed - // as influenced by the backlog of messages in the delegate mailboxes - // - @Test def testBoundedCapacityActorPoolWithMailboxPressure = { - - var latch = new CountDownLatch(3) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { + val t2 = actorOf(new Actor { def receive = { - case n:Int => - Thread.sleep(n) - counter.incrementAndGet + case x => + t2Count.incrementAndGet latch.countDown() } - }) + }).start() - def lowerBound = 2 - def upperBound = 4 - def pressureThreshold = 3 - def rampupRate = 0.1 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route + val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) + + for (i <- 1 to 500) d ! i + + latch.await(10 seconds) + + // because t1 is much slower and thus has a bigger mailbox all the time + t1Count.get must be < (t2Count.get) + + for(a <- List(t1, t2, d)) a.stop() } - val pool = actorOf(new TestPool).start() - var loops = 0 - def loop(t:Int) = { - latch = new CountDownLatch(loops) - counter.set(0) - for (m <- 0 until loops) { - pool ! t + "listen" in { + val fooLatch = TestLatch(2) + val barLatch = TestLatch(2) + val barCount = new AtomicInteger(0) + + val broadcast = actorOf(new Actor with Listeners { + def receive = listenerManagement orElse { + case "foo" => gossip("bar") } - } + }).start() + + def newListener = actorOf(new Actor { + def receive = { + case "bar" => + barCount.incrementAndGet + barLatch.countDown() + case "foo" => + fooLatch.countDown() + } + }).start() + + val a1 = newListener + val a2 = newListener + val a3 = newListener + + broadcast ! Listen(a1) + broadcast ! Listen(a2) + broadcast ! Listen(a3) + + broadcast ! Deafen(a3) + + broadcast ! WithListeners(_ ! "foo") + broadcast ! "foo" + + barLatch.await + barCount.get must be (2) + + fooLatch.await + + for(a <- List(broadcast, a1 ,a2 ,a3)) a.stop() + } + + "be defined at" in { + import akka.actor.ActorRef + + val Yes = "yes" + val No = "no" + + def testActor() = actorOf( new Actor() { + def receive = { + case Yes => "yes" + } + }).start() + + val t1 = testActor() + val t2 = testActor() + val t3 = testActor() + val t4 = testActor() + + val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) + val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) + + t1.isDefinedAt(Yes) must be (true) + t1.isDefinedAt(No) must be (false) + t2.isDefinedAt(Yes) must be (true) + t2.isDefinedAt(No) must be (false) + d1.isDefinedAt(Yes) must be (true) + d1.isDefinedAt(No) must be (false) + d2.isDefinedAt(Yes) must be (true) + d2.isDefinedAt(No) must be (false) + + for(a <- List(t1, t2, d1, d2)) a.stop() + } + } + + "Actor Pool" must { + + "have expected capacity" in { + val latch = TestLatch(2) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool + with FixedCapacityStrategy + with SmallestMailboxSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + count.incrementAndGet + latch.countDown() + self reply_? "success" + } + }).start() + + def limit = 2 + def selectionCount = 1 + def partialFill = true + def instance = factory + def receive = _route + }).start() + + val successes = TestLatch(2) + val successCounter = Some(actorOf(new Actor { + def receive = { + case "success" => successes.countDown() + } + }).start()) + + implicit val replyTo = successCounter + pool ! "a" + pool ! "b" + + latch.await + successes.await + + count.get must be (2) + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + pool.stop() + } + + + "pass ticket #705" in { + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with BasicFilter + { + def lowerBound = 2 + def upperBound = 20 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + def receive = { + case req: String => { + sleepFor(10 millis) + self.reply_?("Response") + } + } + }) + }).start() + + try { + (for (count <- 1 to 500) yield pool.!!![String]("Test", 20000)) foreach { + _.await.resultOrException.get must be ("Response") + } + } finally { + pool.stop() + } + } + + "grow as needed under pressure" in { + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of blocking pooled actors + + var latch = TestLatch(3) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with ActiveFuturesPressureCapacitor + with SmallestMailboxSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case n: Int => + sleepFor(n millis) + count.incrementAndGet + latch.countDown() + } + }) + + def lowerBound = 2 + def upperBound = 4 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + }).start() + + // first message should create the minimum number of delgates + + pool ! 1 + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + var loops = 0 + def loop(t: Int) = { + latch = TestLatch(loops) + count.set(0) + for (m <- 0 until loops) { + pool !!! t + sleepFor(50 millis) + } + } + + // 2 more should go thru without triggering more + + loops = 2 + + loop(500) + latch.await + count.get must be (loops) + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + // a whole bunch should max it out + + loops = 10 + loop(500) + latch.await + count.get must be (loops) + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (4) + + pool.stop() + } + + "grow as needed under mailbox pressure" in { + // make sure the pool starts at the expected lower limit and grows to the upper as needed + // as influenced by the backlog of messages in the delegate mailboxes + + var latch = TestLatch(3) + val count = new AtomicInteger(0) + + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case n: Int => + sleepFor(n millis) + count.incrementAndGet + latch.countDown() + } + }) + + def lowerBound = 2 + def upperBound = 4 + def pressureThreshold = 3 + def rampupRate = 0.1 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + }).start() + + var loops = 0 + def loop(t: Int) = { + latch = TestLatch(loops) + count.set(0) + for (m <- 0 until loops) { + pool ! t + } + } + + // send a few messages and observe pool at its lower bound + loops = 3 + loop(500) + latch.await + count.get must be (loops) - // - // send a few messages and observe pool at its lower bound - // - loops = 3 - loop(500) - var done = latch.await(5,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - // // send a bunch over the theshold and observe an increment - // loops = 15 loop(500) - done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - counter.get must be (loops) + latch.await(10 seconds) + count.get must be (loops) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be >= (3) - pool stop - } - - // Actor Pool Selector Tests - - @Test def testRoundRobinSelector = { - - var latch = new CountDownLatch(2) - val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - - class TestPool1 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown() - } - }) - - def limit = 1 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = true - def instance = factory - def receive = _route + pool.stop() } - val pool1 = actorOf(new TestPool1).start() - pool1 ! "a" - pool1 ! "b" - var done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - delegates.size must be (1) - pool1 stop + "round robin" in { + val latch1 = TestLatch(2) + val delegates = new java.util.concurrent.ConcurrentHashMap[String, String] - class TestPool2 extends Actor with DefaultActorPool - with FixedCapacityStrategy - with RoundRobinSelector - with BasicNoBackoffFilter - { - def factory = actorOf(new Actor { - def receive = { - case _ => - delegates put(self.uuid.toString, "") - latch.countDown() - } - }) + val pool1 = actorOf( + new Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch1.countDown() + } + }) - def limit = 2 - def selectionCount = 2 - def rampupRate = 0.1 - def partialFill = false - def instance = factory - def receive = _route + def limit = 1 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = true + def instance = factory + def receive = _route + }).start() + + pool1 ! "a" + pool1 ! "b" + + latch1.await + delegates.size must be (1) + + pool1.stop() + + val latch2 = TestLatch(2) + delegates.clear() + + val pool2 = actorOf( + new Actor with DefaultActorPool + with FixedCapacityStrategy + with RoundRobinSelector + with BasicNoBackoffFilter + { + def factory = actorOf(new Actor { + def receive = { + case _ => + delegates put(self.uuid.toString, "") + latch2.countDown() + } + }) + + def limit = 2 + def selectionCount = 2 + def rampupRate = 0.1 + def partialFill = false + def instance = factory + def receive = _route + }).start() + + pool2 ! "a" + pool2 ! "b" + + latch2.await + delegates.size must be (2) + + pool2.stop() } - latch = new CountDownLatch(2) - delegates clear + "backoff" in { + val latch = TestLatch(10) - val pool2 = actorOf(new TestPool2).start() - pool2 ! "a" - pool2 ! "b" - done = latch.await(1, TimeUnit.SECONDS) - done must be (true) - delegates.size must be (2) - pool2 stop - } - - // Actor Pool Filter Tests + val pool = actorOf( + new Actor with DefaultActorPool + with BoundedCapacityStrategy + with MailboxPressureCapacitor + with SmallestMailboxSelector + with Filter + with RunningMeanBackoff + with BasicRampup + { + def factory = actorOf(new Actor { + def receive = { + case n: Int => + sleepFor(n millis) + latch.countDown() + } + }) - // - // reuse previous test to max pool then observe filter reducing capacity over time - // - @Test def testBoundedCapacityActorPoolWithBackoffFilter = { + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = factory + def receive = _route + }).start() - var latch = new CountDownLatch(10) - class TestPool extends Actor with DefaultActorPool - with BoundedCapacityStrategy - with MailboxPressureCapacitor - with SmallestMailboxSelector - with Filter - with RunningMeanBackoff - with BasicRampup - { - def factory = actorOf(new Actor { - def receive = { - case n:Int => - Thread.sleep(n) - latch.countDown() - } - }) + // put some pressure on the pool - def lowerBound = 1 - def upperBound = 5 - def pressureThreshold = 1 - def partialFill = true - def selectionCount = 1 - def rampupRate = 0.1 - def backoffRate = 0.50 - def backoffThreshold = 0.50 - def instance = factory - def receive = _route + for (m <- 0 to 10) pool ! 250 + + sleepFor(5 millis) + + val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size + + z must be >= (2) + + // let it cool down + + for (m <- 0 to 3) { + pool ! 1 + sleepFor(500 millis) + } + + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) + + pool.stop() } - - - // - // put some pressure on the pool - // - val pool = actorOf(new TestPool).start() - for (m <- 0 to 10) pool ! 250 - Thread.sleep(5) - val z = (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size - z must be >= (2) - var done = latch.await(10,TimeUnit.SECONDS) - done must be (true) - - - // - // - // - for (m <- 0 to 3) { - pool ! 1 - Thread.sleep(500) - } - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be <= (z) - - pool stop } } +