Covers tickets 988 and 915. Changed ActorRef to notify supervisor with max retry msg when temporary actor shutdown. Actor pool now requires supervision strategy. Updated pool docs.

This commit is contained in:
Garrick Evans 2011-07-15 14:25:09 -07:00
parent 2ec7c848b3
commit 05a1144705
4 changed files with 345 additions and 44 deletions

View file

@ -178,7 +178,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with FixedCapacityStrategy with SmallestMailboxSelector {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
def factory = actorOf(new Actor {
def receive = {
case _
@ -218,7 +218,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
"pass ticket #705" in {
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
@ -256,7 +256,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
@ -321,7 +321,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int
@ -375,7 +375,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
val pool1 = actorOf(
new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
@ -404,7 +404,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
delegates.clear()
val pool2 = actorOf(
new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _
@ -434,7 +434,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val latch = TestLatch(10)
val pool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def factory = actorOf(new Actor {
def receive = {
case n: Int
@ -480,7 +480,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
"support typed actors" in {
import RoutingSpec._
import TypedActor._
def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
@ -499,6 +499,214 @@ class RoutingSpec extends WordSpec with MustMatchers {
for ((i, r) results) r.get must equal(i * i)
}
"provide default supervision of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (1)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (2)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (1)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (2)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (4)
}
"support customizable supervision config of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
object BadState
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die =>
throw new RuntimeException
case _ => pingCount.incrementAndGet
}
}).start()
}).start()
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (1)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be (2)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
}
}
}

View file

@ -11,7 +11,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
"A ? call to an actor pool" should {
"reuse the proper timeout" in {
val actorPool = actorOf(
new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1

View file

@ -772,7 +772,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
lifeCycle match {
case Temporary
shutDownTemporaryActor(this)
shutDownTemporaryActor(this, reason)
true
case _ // either permanent or none where default is permanent
@ -815,7 +815,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
val actorRef = i.next
actorRef.lifeCycle match {
// either permanent or none where default is permanent
case Temporary shutDownTemporaryActor(actorRef)
case Temporary shutDownTemporaryActor(actorRef, reason)
case _ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
}
@ -841,9 +841,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
case valid valid
}
private def shutDownTemporaryActor(temporaryActor: ActorRef) {
private def shutDownTemporaryActor(temporaryActor: ActorRef, reason: Throwable) {
temporaryActor.stop()
_linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
// when this comes down through the handleTrapExit path, we get here when the temp actor is restarted
notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(temporaryActor, Some(0), None, reason))
// if last temporary actor is gone, then unlink me from supervisor
if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this))
true
@ -860,7 +862,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, reason))
else {
lifeCycle match {
case Temporary shutDownTemporaryActor(this)
case Temporary shutDownTemporaryActor(this, reason)
case _ dispatcher.resume(this) //Resume processing for this actor
}
}

View file

@ -4,8 +4,9 @@
package akka.routing
import akka.actor.{ Actor, ActorRef, PoisonPill }
import akka.actor.{ Actor, ActorRef, PoisonPill, Death, MaximumNumberOfRestartsWithinTimeRangeReached }
import akka.dispatch.{ Promise }
import akka.config.Supervision._
/**
* Actor pooling
@ -35,44 +36,102 @@ object ActorPool {
* Defines the nature of an actor pool.
*/
trait ActorPool {
def instance(): ActorRef //Question, Instance of what?
def capacity(delegates: Seq[ActorRef]): Int //Question, What is the semantics of this return value?
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] //Question, Why does select return this instead of an ordered Set?
/**
* Adds a new actor to the pool. The DefaultActorPool implementation will start and link (supervise) this actor.
* This method is invoked whenever the pool determines it must boost capacity.
* @return A new actor for the pool
*/
def instance(): ActorRef
/**
* Returns the overall desired change in pool capacity. This method is used by non-static pools as the means
* for the capacity strategy to influence the pool.
* @param _delegates The current sequence of pooled actors
* @return the number of delegates by which the pool should be adjusted (positive, negative or zero)
*/
def capacity(delegates: Seq[ActorRef]): Int
/**
* Provides the results of the selector, one or more actors, to which an incoming message is forwarded.
* This method returns an iterator since a selector might return more than one actor to handle the message.
* You might want to do this to perform redundant processing of particularly error-prone messages.
* @param _delegates The current sequence of pooled actors
* @return a list of actors to which the message will be delivered
*/
def select(delegates: Seq[ActorRef]): Seq[ActorRef]
}
/**
* A default implementation of a pool, on each message to route,
* - checks the current capacity and adjusts accordingly if needed
* - routes the incoming message to a selection set of delegate actors
* Defines the configuration options for how the pool supervises the actors.
*/
trait DefaultActorPool extends ActorPool { this: Actor
trait ActorPoolSupervisionConfig {
/**
* Defines the default fault handling strategy to be employed by the pool.
*/
def poolFaultHandler: FaultHandlingStrategy
}
/**
* Provides a default implementation of the supervision configuration by
* defining a One-for-One fault handling strategy, trapping exceptions,
* limited to 5 retries within 1 second.
*
* This is just a basic strategy and implementors are encouraged to define
* something more appropriate for their needs.
*/
trait DefaultActorPoolSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
}
/**
* A default implementation of a pool that:
* First, invokes the pool's capacitor that tells it, based on the current delegate count
* and it's own heuristic by how many delegates the pool should be resized. Resizing can
* can be incremental, decremental or flat. If there is a change to capacity, new delegates
* are added or existing ones are removed. Removed actors are sent the PoisonPill message.
* New actors are automatically started and linked. The pool supervises the actors and will
* use the fault handling strategy specified by the mixed-in ActorPoolSupervisionConfig.
* Pooled actors may be any lifecycle. If you're testing pool sizes during runtime, take a
* look at the unit tests... Any delegate with a <b>Permanent</b> lifecycle will be
* restarted and the pool size will be level with what it was prior to the fault. In just
* about every other case, e.g. the delegates are <b>Temporary</b> or the delegate cannot be
* restarted within the time interval specified in the fault handling strategy, the pool will
* be temporarily shy by that actor (it will have been removed by not back-filled). The
* back-fill if any is required, will occur on the next message [as usual].
*
* Second, invokes the pool's selector that returns a list of delegates that are to receive
* the incoming message. Selectors may return more than one actor. If <i>partialFill</i>
* is true then it might also the case that fewer than number of desired actors will be
* returned.
*
* Lastly, routes by forwarding, the incoming message to each delegate in the selected set.
*/
trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervisionConfig
import ActorPool._
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
protected var _delegates = Vector[ActorRef]()
private var _lastCapacityChange = 0
private var _lastSelectorCount = 0
protected[akka] var _delegates = Vector[ActorRef]()
override def postStop() = _delegates foreach { delegate
try {
delegate ! PoisonPill
} catch { case e: Exception } //Ignore any exceptions here
override def preStart() {
self.faultHandler = poolFaultHandler
}
override def postStop() {
_delegates foreach { delegate
try {
delegate ! PoisonPill
} catch { case e: Exception } //Ignore any exceptions here
}
}
protected def _route(): Receive = {
// for testing...
case Stat
self reply_? Stats(_delegates length)
case max: MaximumNumberOfRestartsWithinTimeRangeReached
_delegates = _delegates filterNot { _.uuid == max.victim.uuid }
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case Death(victim, _) =>
_delegates = _delegates filterNot { _.uuid == victim.uuid }
case msg
resizeIfAppropriate()
select(_delegates) match {
case (selectedDelegates, count)
_lastSelectorCount = count
selectedDelegates foreach { _ forward msg } //Should we really send the same message to several actors?
}
select(_delegates) foreach { _ forward msg }
}
private def resizeIfAppropriate() {
@ -95,14 +154,15 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
case _ _delegates //No change
}
_lastCapacityChange = requestedCapacity
_delegates = newDelegates
}
}
/**
* Selectors
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
*
* These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool.
* Note that it's acceptable to return more than one actor to handle a given message.
*/
/**
@ -112,7 +172,7 @@ trait SmallestMailboxSelector {
def selectionCount: Int
def partialFill: Boolean
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
var set: Seq[ActorRef] = Nil
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
@ -121,7 +181,7 @@ trait SmallestMailboxSelector {
take -= set.size
}
(set.iterator, set.size)
set
}
}
@ -134,7 +194,7 @@ trait RoundRobinSelector {
def selectionCount: Int
def partialFill: Boolean
def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
val length = delegates.length
val take = if (partialFill) math.min(selectionCount, length)
else selectionCount
@ -145,13 +205,16 @@ trait RoundRobinSelector {
delegates(_last)
}
(set.iterator, set.size)
set
}
}
/**
* Capacitors
* These traits define how to alter the size of the pool
*
* These traits define how to alter the size of the pool according to some desired behavior.
* Capacitors are required (minimally) by the pool to establish bounds on the number of delegates
* that may exist in the pool.
*/
/**
@ -163,7 +226,13 @@ trait FixedSizeCapacitor {
}
/**
* Constrains the pool capacity to a bounded range
* Constrains the pool capacity to a bounded range.
* This capacitor employs 'pressure capacitors' (sorry for the unforunate confusing naming)
* to feed a 'pressure' delta into the capacity function. This measure is
* basically the difference between the current pressure level and a pre-established threshhold.
* When using this capacitor you must provide a method called 'pressure' or mix-in
* one of the PressureCapacitor traits below.
*
*/
trait BoundedCapacitor {
def lowerBound: Int
@ -200,17 +269,39 @@ trait ActiveFuturesPressureCapacitor {
}
/**
*
*/
trait CapacityStrategy {
import ActorPool._
/**
* This method returns a 'pressure level' that will be fed into the capacitor and
* evaluated against the established threshhold. For instance, in general, if
* the current pressure level exceeds the capacity of the pool, new delegates will
* be added.
*/
def pressure(delegates: Seq[ActorRef]): Int
/**
* This method can be used to smooth the response of the capacitor by considering
* the current pressure and current capacity.
*/
def filter(pressure: Int, capacity: Int): Int
protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
}
/**
* Use this trait to setup a pool that uses a fixed delegate count.
*/
trait FixedCapacityStrategy extends FixedSizeCapacitor
/**
* Use this trait to setup a pool that may have a variable number of
* delegates but always within an established upper and lower limit.
*
* If mix this into your pool implementation, you must also provide a
* PressureCapacitor and a Filter.
*/
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
/**