diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 98b4e57473..fc42e6f50b 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -65,17 +65,9 @@ trait DefaultActorPool extends ActorPool { this: Actor => self reply_? Stats(_delegates length) case max: MaximumNumberOfRestartsWithinTimeRangeReached => _delegates = _delegates filterNot { _.uuid == max.victim.uuid } - case msg => _capacity() - _select() foreach { delegate => - self.senderFuture match { - case None => - delegate ! msg - case Some(future) => - delegate.!!!(msg, TimeUnit.NANOSECONDS.toMillis(future.timeoutInNanos)).onComplete( future.completeWith(_) ) - } - } + _select() foreach { _ forward msg } } private def _capacity() = { diff --git a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala index bce649b43b..1be34a41bc 100644 --- a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala @@ -180,44 +180,52 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers for(a <- List(t1,t2,d1,d2)) a.stop } - // Actor Pool Capacity Tests - - // - // make sure the pool is of the fixed, expected capacity - // - @Test def testFixedCapacityActorPool = { + // Actor Pool Capacity Tests - val latch = new CountDownLatch(2) - val counter = new AtomicInteger(0) - class TestPool extends Actor with DefaultActorPool - with FixedCapacityStrategy - with SmallestMailboxSelector - { - def factory = actorOf(new Actor { - def receive = { - case _ => - counter.incrementAndGet - latch.countDown - } - }) - - def limit = 2 - def selectionCount = 1 - def partialFill = true - def instance = factory - def receive = _route - } - - val pool = actorOf(new TestPool).start - pool ! "a" - pool ! "b" - val done = latch.await(1,TimeUnit.SECONDS) - done must be (true) - counter.get must be (2) - (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) - - pool stop + // + // make sure the pool is of the fixed, expected capacity + // + @Test def testFixedCapacityActorPool = { + val latch = new CountDownLatch(2) + val counter = new AtomicInteger(0) + class TestPool extends Actor with DefaultActorPool + with FixedCapacityStrategy + with SmallestMailboxSelector + { + def factory = actorOf(new Actor { + def receive = { + case _ => + counter.incrementAndGet + latch.countDown + self reply_? "success" } + }) + + def limit = 2 + def selectionCount = 1 + def partialFill = true + def instance = factory + def receive = _route + } + + val successes = new CountDownLatch(2) + implicit val successCounterActor = Some(actorOf(new Actor { + def receive = { + case "success" => successes.countDown + } + }).start) + + val pool = actorOf(new TestPool).start + pool ! "a" + pool ! "b" + + latch.await(1,TimeUnit.SECONDS) must be (true) + successes.await(1,TimeUnit.SECONDS) must be (true) + counter.get must be (2) + (pool !! ActorPool.Stat).asInstanceOf[Option[ActorPool.Stats]].get.size must be (2) + + pool stop + } // // make sure the pool starts at the expected lower limit and grows to the upper as needed