diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
index 5f4ceedabb..2cdd3e2c4a 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
@@ -178,7 +178,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
- new Actor with DefaultActorPool with FixedCapacityStrategy with SmallestMailboxSelector {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with SmallestMailboxSelector {
def factory = actorOf(new Actor {
def receive = {
case _ ⇒
@@ -218,7 +218,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
"pass ticket #705" in {
val pool = actorOf(
- new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
@@ -256,7 +256,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
- new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int ⇒
@@ -321,7 +321,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val count = new AtomicInteger(0)
val pool = actorOf(
- new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case n: Int ⇒
@@ -375,7 +375,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val delegates = new java.util.concurrent.ConcurrentHashMap[String, String]
val pool1 = actorOf(
- new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _ ⇒
@@ -404,7 +404,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
delegates.clear()
val pool2 = actorOf(
- new Actor with DefaultActorPool with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with FixedCapacityStrategy with RoundRobinSelector with BasicNoBackoffFilter {
def factory = actorOf(new Actor {
def receive = {
case _ ⇒
@@ -434,7 +434,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val latch = TestLatch(10)
val pool = actorOf(
- new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def factory = actorOf(new Actor {
def receive = {
case n: Int ⇒
@@ -480,7 +480,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
"support typed actors" in {
import RoutingSpec._
import TypedActor._
- def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
+ def createPool = new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
@@ -499,6 +499,214 @@ class RoutingSpec extends WordSpec with MustMatchers {
for ((i, r) ← results) r.get must equal(i * i)
}
+
+ "provide default supervision of pooled actors" in {
+ import akka.config.Supervision._
+ val pingCount = new AtomicInteger(0)
+ val deathCount = new AtomicInteger(0)
+ var keepDying = false
+
+ val pool1 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
+ def receive = {
+ case akka.Die ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new RuntimeException
+ case _ => pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ val pool2 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ self.lifeCycle = Permanent
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
+ def receive = {
+ case akka.Die ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new RuntimeException
+ case _ => pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ val pool3 = actorOf(
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ self.lifeCycle = Temporary
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
+ def receive = {
+ case akka.Die ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new RuntimeException
+ case _ => pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+ // default lifecycle
+ // actor comes back right away
+ pingCount.set(0)
+ keepDying = false
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! akka.Die
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (1)
+
+ // default lifecycle
+ // actor dies completely
+ pingCount.set(0)
+ keepDying = true
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! akka.Die
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (2)
+
+ // permanent lifecycle
+ // actor comes back right away
+ pingCount.set(0)
+ keepDying = false
+ pool2 ! "ping"
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool2 ! akka.Die
+ sleepFor(2 seconds)
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (1)
+
+ // permanent lifecycle
+ // actor dies completely
+ pingCount.set(0)
+ keepDying = true
+ pool2 ! "ping"
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool2 ! akka.Die
+ sleepFor(2 seconds)
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool2 ! "ping"
+ (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (2)
+
+ // temporary lifecycle
+ pingCount.set(0)
+ keepDying = false
+ pool3 ! "ping"
+ (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool3 ! akka.Die
+ sleepFor(2 seconds)
+ (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool3 ! "ping"
+ pool3 ! "ping"
+ pool3 ! "ping"
+ (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (4)
+ }
+
+ "support customizable supervision config of pooled actors" in {
+ import akka.config.Supervision._
+ val pingCount = new AtomicInteger(0)
+ val deathCount = new AtomicInteger(0)
+ var keepDying = false
+
+ trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
+ def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
+ }
+
+ object BadState
+
+ val pool1 = actorOf(
+ new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
+ def lowerBound = 2
+ def upperBound = 5
+ def rampupRate = 0.1
+ def backoffRate = 0.1
+ def backoffThreshold = 0.5
+ def partialFill = true
+ def selectionCount = 1
+ def instance = factory
+ def receive = _route
+ def pressureThreshold = 1
+ def factory = actorOf(new Actor {
+ if (deathCount.get > 5) deathCount.set(0)
+ if (deathCount.get > 0) {deathCount.incrementAndGet;throw new IllegalStateException("keep dying")}
+ def receive = {
+ case BadState ⇒
+ if (keepDying) deathCount.incrementAndGet
+ throw new IllegalStateException
+ case akka.Die =>
+ throw new RuntimeException
+ case _ => pingCount.incrementAndGet
+ }
+ }).start()
+ }).start()
+
+
+ // actor comes back right away
+ pingCount.set(0)
+ keepDying = false
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! BadState
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (1)
+
+ // actor dies completely
+ pingCount.set(0)
+ keepDying = true
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pool1 ! BadState
+ sleepFor(2 seconds)
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
+ pool1 ! "ping"
+ (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
+ pingCount.get must be (2)
+
+ // kill it
+ intercept[RuntimeException](pool1.?(akka.Die).get)
+ }
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala
index b838f33efc..fd9bc4d1f7 100644
--- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala
+++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala
@@ -11,7 +11,7 @@ class Ticket703Spec extends WordSpec with MustMatchers {
"A ? call to an actor pool" should {
"reuse the proper timeout" in {
val actorPool = actorOf(
- new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
+ new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with BasicNoBackoffFilter {
def lowerBound = 2
def upperBound = 20
def rampupRate = 0.1
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 0874f93c4c..88cfdfe6a2 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -772,7 +772,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
lifeCycle match {
case Temporary ⇒
- shutDownTemporaryActor(this)
+ shutDownTemporaryActor(this, reason)
true
case _ ⇒ // either permanent or none where default is permanent
@@ -815,7 +815,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
val actorRef = i.next
actorRef.lifeCycle match {
// either permanent or none where default is permanent
- case Temporary ⇒ shutDownTemporaryActor(actorRef)
+ case Temporary ⇒ shutDownTemporaryActor(actorRef, reason)
case _ ⇒ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
}
@@ -841,9 +841,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
case valid ⇒ valid
}
- private def shutDownTemporaryActor(temporaryActor: ActorRef) {
+ private def shutDownTemporaryActor(temporaryActor: ActorRef, reason: Throwable) {
temporaryActor.stop()
_linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
+ // when this comes down through the handleTrapExit path, we get here when the temp actor is restarted
+ notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(temporaryActor, Some(0), None, reason))
// if last temporary actor is gone, then unlink me from supervisor
if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this))
true
@@ -860,7 +862,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, reason))
else {
lifeCycle match {
- case Temporary ⇒ shutDownTemporaryActor(this)
+ case Temporary ⇒ shutDownTemporaryActor(this, reason)
case _ ⇒ dispatcher.resume(this) //Resume processing for this actor
}
}
diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala
index 249d087ed6..ea280adcae 100644
--- a/akka-actor/src/main/scala/akka/routing/Pool.scala
+++ b/akka-actor/src/main/scala/akka/routing/Pool.scala
@@ -4,8 +4,9 @@
package akka.routing
-import akka.actor.{ Actor, ActorRef, PoisonPill }
+import akka.actor.{ Actor, ActorRef, PoisonPill, Death, MaximumNumberOfRestartsWithinTimeRangeReached }
import akka.dispatch.{ Promise }
+import akka.config.Supervision._
/**
* Actor pooling
@@ -35,44 +36,102 @@ object ActorPool {
* Defines the nature of an actor pool.
*/
trait ActorPool {
- def instance(): ActorRef //Question, Instance of what?
- def capacity(delegates: Seq[ActorRef]): Int //Question, What is the semantics of this return value?
- def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] //Question, Why does select return this instead of an ordered Set?
+ /**
+ * Adds a new actor to the pool. The DefaultActorPool implementation will start and link (supervise) this actor.
+ * This method is invoked whenever the pool determines it must boost capacity.
+ * @return A new actor for the pool
+ */
+ def instance(): ActorRef
+ /**
+ * Returns the overall desired change in pool capacity. This method is used by non-static pools as the means
+ * for the capacity strategy to influence the pool.
+ * @param _delegates The current sequence of pooled actors
+ * @return the number of delegates by which the pool should be adjusted (positive, negative or zero)
+ */
+ def capacity(delegates: Seq[ActorRef]): Int
+ /**
+ * Provides the results of the selector, one or more actors, to which an incoming message is forwarded.
+ * This method returns an iterator since a selector might return more than one actor to handle the message.
+ * You might want to do this to perform redundant processing of particularly error-prone messages.
+ * @param _delegates The current sequence of pooled actors
+ * @return a list of actors to which the message will be delivered
+ */
+ def select(delegates: Seq[ActorRef]): Seq[ActorRef]
}
/**
- * A default implementation of a pool, on each message to route,
- * - checks the current capacity and adjusts accordingly if needed
- * - routes the incoming message to a selection set of delegate actors
+ * Defines the configuration options for how the pool supervises the actors.
*/
-trait DefaultActorPool extends ActorPool { this: Actor ⇒
+trait ActorPoolSupervisionConfig {
+ /**
+ * Defines the default fault handling strategy to be employed by the pool.
+ */
+ def poolFaultHandler: FaultHandlingStrategy
+}
+
+/**
+ * Provides a default implementation of the supervision configuration by
+ * defining a One-for-One fault handling strategy, trapping exceptions,
+ * limited to 5 retries within 1 second.
+ *
+ * This is just a basic strategy and implementors are encouraged to define
+ * something more appropriate for their needs.
+ */
+trait DefaultActorPoolSupervisionConfig extends ActorPoolSupervisionConfig {
+ def poolFaultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
+}
+
+/**
+ * A default implementation of a pool that:
+ * First, invokes the pool's capacitor that tells it, based on the current delegate count
+ * and it's own heuristic by how many delegates the pool should be resized. Resizing can
+ * can be incremental, decremental or flat. If there is a change to capacity, new delegates
+ * are added or existing ones are removed. Removed actors are sent the PoisonPill message.
+ * New actors are automatically started and linked. The pool supervises the actors and will
+ * use the fault handling strategy specified by the mixed-in ActorPoolSupervisionConfig.
+ * Pooled actors may be any lifecycle. If you're testing pool sizes during runtime, take a
+ * look at the unit tests... Any delegate with a Permanent lifecycle will be
+ * restarted and the pool size will be level with what it was prior to the fault. In just
+ * about every other case, e.g. the delegates are Temporary or the delegate cannot be
+ * restarted within the time interval specified in the fault handling strategy, the pool will
+ * be temporarily shy by that actor (it will have been removed by not back-filled). The
+ * back-fill if any is required, will occur on the next message [as usual].
+ *
+ * Second, invokes the pool's selector that returns a list of delegates that are to receive
+ * the incoming message. Selectors may return more than one actor. If partialFill
+ * is true then it might also the case that fewer than number of desired actors will be
+ * returned.
+ *
+ * Lastly, routes by forwarding, the incoming message to each delegate in the selected set.
+ */
+trait DefaultActorPool extends ActorPool { this: Actor with ActorPoolSupervisionConfig ⇒
import ActorPool._
- import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
- protected var _delegates = Vector[ActorRef]()
- private var _lastCapacityChange = 0
- private var _lastSelectorCount = 0
+ protected[akka] var _delegates = Vector[ActorRef]()
- override def postStop() = _delegates foreach { delegate ⇒
- try {
- delegate ! PoisonPill
- } catch { case e: Exception ⇒ } //Ignore any exceptions here
+ override def preStart() {
+ self.faultHandler = poolFaultHandler
+ }
+ override def postStop() {
+ _delegates foreach { delegate ⇒
+ try {
+ delegate ! PoisonPill
+ } catch { case e: Exception ⇒ } //Ignore any exceptions here
+ }
}
protected def _route(): Receive = {
// for testing...
case Stat ⇒
self reply_? Stats(_delegates length)
- case max: MaximumNumberOfRestartsWithinTimeRangeReached ⇒
- _delegates = _delegates filterNot { _.uuid == max.victim.uuid }
+ case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒
+ _delegates = _delegates filterNot { _.uuid == victim.uuid }
+ case Death(victim, _) =>
+ _delegates = _delegates filterNot { _.uuid == victim.uuid }
case msg ⇒
resizeIfAppropriate()
- select(_delegates) match {
- case (selectedDelegates, count) ⇒
- _lastSelectorCount = count
- selectedDelegates foreach { _ forward msg } //Should we really send the same message to several actors?
- }
+ select(_delegates) foreach { _ forward msg }
}
private def resizeIfAppropriate() {
@@ -95,14 +154,15 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
case _ ⇒ _delegates //No change
}
- _lastCapacityChange = requestedCapacity
_delegates = newDelegates
}
}
/**
* Selectors
- * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
+ *
+ * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool.
+ * Note that it's acceptable to return more than one actor to handle a given message.
*/
/**
@@ -112,7 +172,7 @@ trait SmallestMailboxSelector {
def selectionCount: Int
def partialFill: Boolean
- def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
+ def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
var set: Seq[ActorRef] = Nil
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
@@ -121,7 +181,7 @@ trait SmallestMailboxSelector {
take -= set.size
}
- (set.iterator, set.size)
+ set
}
}
@@ -134,7 +194,7 @@ trait RoundRobinSelector {
def selectionCount: Int
def partialFill: Boolean
- def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
+ def select(delegates: Seq[ActorRef]): Seq[ActorRef] = {
val length = delegates.length
val take = if (partialFill) math.min(selectionCount, length)
else selectionCount
@@ -145,13 +205,16 @@ trait RoundRobinSelector {
delegates(_last)
}
- (set.iterator, set.size)
+ set
}
}
/**
* Capacitors
- * These traits define how to alter the size of the pool
+ *
+ * These traits define how to alter the size of the pool according to some desired behavior.
+ * Capacitors are required (minimally) by the pool to establish bounds on the number of delegates
+ * that may exist in the pool.
*/
/**
@@ -163,7 +226,13 @@ trait FixedSizeCapacitor {
}
/**
- * Constrains the pool capacity to a bounded range
+ * Constrains the pool capacity to a bounded range.
+ * This capacitor employs 'pressure capacitors' (sorry for the unforunate confusing naming)
+ * to feed a 'pressure' delta into the capacity function. This measure is
+ * basically the difference between the current pressure level and a pre-established threshhold.
+ * When using this capacitor you must provide a method called 'pressure' or mix-in
+ * one of the PressureCapacitor traits below.
+ *
*/
trait BoundedCapacitor {
def lowerBound: Int
@@ -200,17 +269,39 @@ trait ActiveFuturesPressureCapacitor {
}
/**
+ *
*/
trait CapacityStrategy {
import ActorPool._
+ /**
+ * This method returns a 'pressure level' that will be fed into the capacitor and
+ * evaluated against the established threshhold. For instance, in general, if
+ * the current pressure level exceeds the capacity of the pool, new delegates will
+ * be added.
+ */
def pressure(delegates: Seq[ActorRef]): Int
+ /**
+ * This method can be used to smooth the response of the capacitor by considering
+ * the current pressure and current capacity.
+ */
def filter(pressure: Int, capacity: Int): Int
protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
}
+/**
+ * Use this trait to setup a pool that uses a fixed delegate count.
+ */
trait FixedCapacityStrategy extends FixedSizeCapacitor
+
+/**
+ * Use this trait to setup a pool that may have a variable number of
+ * delegates but always within an established upper and lower limit.
+ *
+ * If mix this into your pool implementation, you must also provide a
+ * PressureCapacitor and a Filter.
+ */
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
/**