diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 43248224c2..fe971777f6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -38,19 +38,18 @@ class ActorPoolSpec extends WordSpec with MustMatchers { val pool = actorOf( Props(new Actor with DefaultActorPool with FixedCapacityStrategy with SmallestMailboxSelector { - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case _ ⇒ count.incrementAndGet latch.countDown() tryReply("success") } - }) + })) def limit = 2 def selectionCount = 1 def partialFill = true - def instance = factory def receive = _route }).withFaultHandler(faultHandler)) @@ -85,17 +84,16 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def backoffThreshold = 0.5 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case req: String ⇒ { sleepFor(10 millis) tryReply("Response") } } - }) + })) }).withFaultHandler(faultHandler)) try { @@ -116,21 +114,20 @@ class ActorPoolSpec extends WordSpec with MustMatchers { val pool = actorOf( Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case n: Int ⇒ sleepFor(n millis) count.incrementAndGet latch.countDown() } - }) + })) def lowerBound = 2 def upperBound = 4 def rampupRate = 0.1 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route }).withFaultHandler(faultHandler)) @@ -181,14 +178,14 @@ class ActorPoolSpec extends WordSpec with MustMatchers { val pool = actorOf( Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case n: Int ⇒ sleepFor(n millis) count.incrementAndGet latch.countDown() } - }) + })) def lowerBound = 2 def upperBound = 4 @@ -196,7 +193,6 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def rampupRate = 0.1 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route }).withFaultHandler(faultHandler)) @@ -235,19 +231,19 @@ class ActorPoolSpec extends WordSpec with MustMatchers { val pool1 = actorOf( Props(new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { + + def instance(p: Props): ActorRef = actorOf(p.withCreator(new Actor { def receive = { case _ ⇒ delegates put (self.uuid.toString, "") latch1.countDown() } - }) + })) def limit = 1 def selectionCount = 1 def rampupRate = 0.1 def partialFill = true - def instance = factory def receive = _route }).withFaultHandler(faultHandler)) @@ -264,19 +260,18 @@ class ActorPoolSpec extends WordSpec with MustMatchers { val pool2 = actorOf( Props(new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter { - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case _ ⇒ delegates put (self.uuid.toString, "") latch2.countDown() } - }) + })) def limit = 2 def selectionCount = 1 def rampupRate = 0.1 def partialFill = false - def instance = factory def receive = _route }).withFaultHandler(faultHandler)) @@ -294,13 +289,13 @@ class ActorPoolSpec extends WordSpec with MustMatchers { val pool = actorOf( Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case n: Int ⇒ sleepFor(n millis) latch.countDown() } - }) + })) def lowerBound = 1 def upperBound = 5 @@ -310,7 +305,6 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def rampupRate = 0.1 def backoffRate = 0.50 def backoffThreshold = 0.50 - def instance = factory def receive = _route }).withFaultHandler(faultHandler)) @@ -348,7 +342,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def rampupRate = 0.1 def backoffRate = 0.50 def backoffThreshold = 0.50 - def instance = getActorRefFor(typedActorOf[Foo, FooImpl]()) + def instance(p: Props) = getActorRefFor(typedActorOf[Foo, FooImpl](p)) def receive = _route } @@ -374,10 +368,9 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def backoffThreshold = 0.5 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(Props(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { @@ -386,7 +379,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - }).withSupervisor(self)) + })) }).withFaultHandler(faultHandler)) val pool2 = actorOf( @@ -398,10 +391,9 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def backoffThreshold = 0.5 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(Props(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { @@ -410,7 +402,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - }).withSupervisor(self)) + })) }).withFaultHandler(faultHandler)) val pool3 = actorOf( @@ -422,10 +414,9 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def backoffThreshold = 0.5 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(Props(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } @@ -437,7 +428,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - }).withSupervisor(self)) + })) }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) // default lifecycle @@ -521,10 +512,9 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def backoffThreshold = 0.5 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(Props(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { @@ -535,7 +525,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - }).withSupervisor(self)) + })) }).withFaultHandler(OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000))) // actor comes back right away diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index 1dcecbc0fe..dcb758234e 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -17,16 +17,15 @@ class Ticket703Spec extends WordSpec with MustMatchers { def rampupRate = 0.1 def partialFill = true def selectionCount = 1 - def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(new Actor { + def instance(p: Props) = actorOf(p.withCreator(new Actor { def receive = { case req: String ⇒ Thread.sleep(6000L) tryReply("Response") } - }) + })) }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) (actorPool.?("Ping", 10000)).await.result must be === Some("Response") } diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 053b3ec345..10aeedcc34 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -40,7 +40,13 @@ trait ActorPool { * This method is invoked whenever the pool determines it must boost capacity. * @return A new actor for the pool */ - def instance(): ActorRef + def instance(defaults: Props): ActorRef + + /** + * This method gets called when a delegate is to be evicted, by default it sends a PoisonPill to the delegate + */ + def evict(delegate: ActorRef): Unit = delegate ! PoisonPill + /** * Returns the overall desired change in pool capacity. This method is used by non-static pools as the means * for the capacity strategy to influence the pool. @@ -87,8 +93,11 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ protected[akka] var _delegates = Vector[ActorRef]() + val defaultProps: Props = Props.default.withSupervisor(this.self).withDispatcher(this.context.dispatcher) + override def postStop() { - _delegates foreach { _ ! PoisonPill } + _delegates foreach evict + _delegates = Vector.empty } protected def _route(): Receive = { @@ -109,7 +118,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ case qty if qty > 0 ⇒ _delegates ++ { for (i ← 0 until requestedCapacity) yield { - val delegate = instance() + val delegate = instance(defaultProps) self link delegate delegate } @@ -117,7 +126,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ case qty if qty < 0 ⇒ _delegates.splitAt(_delegates.length + requestedCapacity) match { case (keep, abandon) ⇒ - abandon foreach { _ ! PoisonPill } + abandon foreach evict keep } case _ ⇒ _delegates //No change diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 2f195c04cf..e1a47472aa 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -231,10 +231,10 @@ Examples def rampupRate = 0.1 def partialFill = true def selectionCount = 1 - def instance = actorOf(new Actor {def receive = {case n:Int => + def instance(defaults: Props) = actorOf(defaults.withCreator(new Actor {def receive = {case n:Int => Thread.sleep(n) counter.incrementAndGet - latch.countDown()}}) + latch.countDown()}})) } .. code-block:: scala @@ -256,9 +256,9 @@ Examples def rampupRate = 0.1 def backoffRate = 0.50 def backoffThreshold = 0.50 - def instance = actorOf(new Actor {def receive = {case n:Int => + def instance(defaults: Props) = actorOf(defaults.withCreator(new Actor {def receive = {case n:Int => Thread.sleep(n) - latch.countDown()}}) + latch.countDown()}})) } Taken from the unit test `spec `_.